Merge "Allow kinesis-events to be configured as events-broker" into stable-3.3
diff --git a/setup_local_env/README.md b/setup_local_env/README.md
index 5b99996..0b1d00c 100644
--- a/setup_local_env/README.md
+++ b/setup_local_env/README.md
@@ -4,7 +4,8 @@
 The environment is composed by:
 
 - 2 gerrit instances deployed by default in /tmp
-- 1 kafka node and 1 zookeeper node
+- 1 zookeeper node
+- 1 Broker node (either kafka or kinesis)
 - 1 HA-PROXY
 
 ## Requirements
@@ -14,15 +15,27 @@
 - wget
 - envsubst
 - haproxy
+- aws-cli (only when broker_type is "kinesis")
 
 ## Examples
 
-Simplest setup with all default values and cleanup previous deployment
+Simplest setup with all default values and cleanup previous deployment. This
+will deploy kafka broker
 
 ```bash
 sh setup_local_env/setup.sh --release-war-file /path/to/gerrit.war --multisite-lib-file /path/to/multi-site.jar
 ```
 
+Deploy Kinesis broker
+
+```bash
+sh setup_local_env/setup.sh \
+    --release-war-file /path/to/gerrit.war \
+    --multisite-lib-file /path/to/multi-site.jar \
+    --broker-type kinesis
+```
+
+
 Cleanup the previous deployments
 
 ```bash
@@ -59,6 +72,8 @@
 [--just-cleanup-env]            Cleans up previous deployment; default false
 
 [--enabled-https]               Enabled https; default true
+
+[--broker_type]                 events broker type; either 'kafka' or 'kinesis'. Default 'kafka'
 ```
 
 ## Limitations
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index f9eca89..0f19578 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -41,13 +41,20 @@
     directory = $FAKE_NFS
 [plugin "kafka-events"]
     sendAsync = true
-    bootstrapServers = localhost:$KAFKA_PORT
-    groupId = $KAFKA_GROUP_ID
+    bootstrapServers = localhost:$BROKER_PORT
+    groupId = $GROUP_ID
     numberOfSubscribers = 6
     securityProtocol = PLAINTEXT
     pollingIntervalMs = 1000
     enableAutoCommit = true
     autoCommitIntervalMs = 1000
     autoOffsetReset = latest
+[plugin "kinesis-events"]
+    numberOfSubscribers = 6
+    pollingIntervalMs = 1000
+    region = us-east-1
+    endpoint = http://localhost:$BROKER_PORT
+    applicationName = $GROUP_ID
+    initialPosition = trim_horizon
 [plugin "metrics-reporter-prometheus"]
     prometheusBearerToken = token
diff --git a/setup_local_env/docker-compose.yaml b/setup_local_env/docker-compose-core.yaml
similarity index 61%
rename from setup_local_env/docker-compose.yaml
rename to setup_local_env/docker-compose-core.yaml
index c386d46..dab168d 100644
--- a/setup_local_env/docker-compose.yaml
+++ b/setup_local_env/docker-compose-core.yaml
@@ -5,15 +5,8 @@
     ports:
       - "2181:2181"
     container_name: zk_test_node
-  kafka:
-    image: wurstmeister/kafka:2.12-2.1.0
-    ports:
-      - "9092:9092"
-    container_name: kafka_test_node
-    environment:
-      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
   prometheus:
+    container_name: prometheus_test_node
     image: prom/prometheus:v2.16.0
     user: root
     volumes:
diff --git a/setup_local_env/docker-compose-kafka.yaml b/setup_local_env/docker-compose-kafka.yaml
new file mode 100644
index 0000000..8a31502
--- /dev/null
+++ b/setup_local_env/docker-compose-kafka.yaml
@@ -0,0 +1,15 @@
+version: '3'
+services:
+  kafka:
+    image: wurstmeister/kafka:2.12-2.1.0
+    ports:
+      - "9092:9092"
+    container_name: kafka_test_node
+    environment:
+      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+    networks:
+      - setup_local_env_default
+networks:
+  setup_local_env_default:
+    external: true
diff --git a/setup_local_env/docker-compose-kinesis.yaml b/setup_local_env/docker-compose-kinesis.yaml
new file mode 100644
index 0000000..321d760
--- /dev/null
+++ b/setup_local_env/docker-compose-kinesis.yaml
@@ -0,0 +1,16 @@
+version: '3'
+services:
+  kinesis:
+    image: localstack/localstack:0.12.8
+    ports:
+      - "4566:4566"
+      - "4751:4751"
+    container_name: kinesis_test_node
+    environment:
+      SERVICES: dynamodb,cloudwatch,kinesis
+      AWS_REGION: us-east-1
+    networks:
+      - setup_local_env_default
+networks:
+  setup_local_env_default:
+    external: true
\ No newline at end of file
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index f25281c..5c9de6f 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -30,6 +30,9 @@
   type wget >/dev/null 2>&1 || { echo >&2 "Require wget but it's not installed. Aborting."; exit 1; }
   type envsubst >/dev/null 2>&1 || { echo >&2 "Require envsubst but it's not installed. Aborting."; exit 1; }
   type openssl >/dev/null 2>&1 || { echo >&2 "Require openssl but it's not installed. Aborting."; exit 1; }
+  if [ "$BROKER_TYPE" = "kinesis" ];  then
+    type aws >/dev/null 2>&1 || { echo >&2 "Require aws-cli but it's not installed. Aborting."; exit 1; }
+  fi
 }
 
 function get_replication_url {
@@ -67,7 +70,7 @@
     export GERRIT_HOSTNAME=$7
     export REPLICATION_HOSTNAME=$8
     export REMOTE_DEBUG_PORT=$9
-    export KAFKA_GROUP_ID=${10}
+    export GROUP_ID=${10}
     export REPLICATION_URL=$(get_replication_url $REPLICATION_LOCATION_TEST_SITE $REPLICATION_HOSTNAME)
 
     echo "Replacing variables for file $file and copying to $CONFIG_TEST_SITE/$file_name"
@@ -99,10 +102,15 @@
   haproxy -f $HA_PROXY_CONFIG_DIR/haproxy.cfg &
 }
 
-function deploy_config_files {
-  # KAFKA configuration
-  export KAFKA_PORT=9092
+function export_broker_port {
+  if [ "$BROKER_TYPE" = "kinesis" ]; then
+    export BROKER_PORT=4566
+  elif [ "$BROKER_TYPE" =  "kafka" ]; then
+    export BROKER_PORT=9092
+  fi
+}
 
+function deploy_config_files {
   # ZK configuration
   export ZK_PORT=2181
 
@@ -112,21 +120,21 @@
   GERRIT_SITE1_SSHD_PORT=$3
   CONFIG_TEST_SITE_1=$LOCATION_TEST_SITE_1/etc
   GERRIT_SITE1_REMOTE_DEBUG_PORT="5005"
-  GERRIT_SITE1_KAFKA_GROUP_ID="instance-1"
+  GERRIT_SITE1_GROUP_ID="instance-1"
   # SITE 2
   GERRIT_SITE2_HOSTNAME=$4
   GERRIT_SITE2_HTTPD_PORT=$5
   GERRIT_SITE2_SSHD_PORT=$6
   CONFIG_TEST_SITE_2=$LOCATION_TEST_SITE_2/etc
   GERRIT_SITE2_REMOTE_DEBUG_PORT="5006"
-  GERRIT_SITE2_KAFKA_GROUP_ID="instance-2"
+  GERRIT_SITE2_GROUP_ID="instance-2"
 
   # Set config SITE1
-  copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_KAFKA_GROUP_ID
+  copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_GROUP_ID
 
 
   # Set config SITE2
-  copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_KAFKA_GROUP_ID
+  copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_GROUP_ID
 }
 
 function is_docker_desktop {
@@ -142,13 +150,32 @@
   fi
 }
 
+function create_kinesis_streams {
+  for stream in "gerrit_batch_index" "gerrit_cache_eviction" "gerrit_index" "gerrit_list_project" "gerrit_stream" "gerrit_web_session" "gerrit"
+  do
+    create_kinesis_stream $stream
+  done
+}
+
+function create_kinesis_stream {
+  local stream=$1
+
+  echo "[KINESIS] Create stream $stream"
+  until aws --endpoint-url=http://localhost:$BROKER_PORT kinesis create-stream --shard-count 1 --stream-name "$stream"
+  do
+      echo "[KINESIS stream $stream] Creation failed. Retrying in 5 seconds..."
+      sleep 5s
+  done
+}
 
 function cleanup_environment {
   echo "Killing existing HA-PROXY setup"
   kill $(ps -ax | grep haproxy | grep "gerrit_setup/ha-proxy-config" | awk '{print $1}') 2> /dev/null
 
-  echo "Stopping docker containers"
-  docker-compose -f $SCRIPT_DIR/docker-compose.yaml down 2> /dev/null
+  echo "Stopping $BROKER_TYPE docker container"
+  docker-compose -f "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.yaml" down 2> /dev/null
+  echo "Stopping core docker containers"
+  docker-compose -f "${SCRIPT_DIR}/docker-compose-core.yaml" down 2> /dev/null
 
   echo "Stopping GERRIT instances"
   $1/bin/gerrit.sh stop 2> /dev/null
@@ -158,8 +185,32 @@
   rm -rf $3 2> /dev/null
 }
 
-function check_if_kafka_is_running {
-  echo $(docker inspect kafka_test_node 2> /dev/null | grep '"Running": true' | wc -l)
+function check_if_container_is_running {
+  local container=$1;
+  echo $(docker inspect "$container" 2> /dev/null | grep '"Running": true' | wc -l)
+}
+
+function ensure_docker_compose_is_up_and_running {
+  local log_label=$1
+  local container_name=$2
+  local docker_compose_file=$3
+
+  local is_container_running=$(check_if_container_is_running "$container_name")
+  if [ "$is_container_running" -lt 1 ];then
+    echo "[$log_label] Starting docker containers"
+    docker-compose -f "${SCRIPT_DIR}/${docker_compose_file}" up -d
+
+    echo "[$log_label] Waiting for docker containers to start..."
+    while [[ $(check_if_container_is_running "$container_name") -lt 1 ]];do sleep 10s; done
+  else
+    echo "[$log_label] Containers already running, nothing to do"
+  fi
+}
+
+function prepare_broker_data {
+  if [ "$BROKER_TYPE" = "kinesis" ]; then
+    create_kinesis_streams
+  fi
 }
 
 while [ $# -ne 0 ]
@@ -194,6 +245,8 @@
     echo
     echo "[--enabled-https]               Enabled https; default true"
     echo
+    echo "[--broker-type]                 events broker type; either 'kafka' or 'kinesis'. Default 'kafka'"
+    echo
     exit 0
   ;;
   "--new-deployment")
@@ -281,6 +334,15 @@
     shift
     shift
   ;;
+ "--broker-type" )
+    BROKER_TYPE=$2
+    shift
+    shift
+    if [ ! "$BROKER_TYPE" = "kafka" ] && [ ! "$BROKER_TYPE" = "kinesis" ]; then
+      echo >&2 "broker type: '$BROKER_TYPE' not valid. Please supply 'kafka' or 'kinesis'. Aborting"
+      exit 1
+    fi
+  ;;
   *     )
     echo "Unknown option argument: $1"
     shift
@@ -309,6 +371,7 @@
 export REPLICATION_DELAY_SEC=${REPLICATION_DELAY_SEC:-"5"}
 export SSH_ADVERTISED_PORT=${SSH_ADVERTISED_PORT:-"29418"}
 HTTPS_ENABLED=${HTTPS_ENABLED:-"false"}
+BROKER_TYPE=${BROKER_TYPE:-"kafka"}
 
 export COMMON_LOCATION=$DEPLOYMENT_LOCATION/gerrit_setup
 LOCATION_TEST_SITE_1=$COMMON_LOCATION/instance-1
@@ -367,10 +430,19 @@
   -O $DEPLOYMENT_LOCATION/events-broker.jar || { echo >&2 "Cannot download events-broker library: Check internet connection. Abort\
 ing"; exit 1; }
 
+if [ "$BROKER_TYPE" = "kafka" ]; then
 echo "Downloading kafka-events plugin $GERRIT_BRANCH"
   wget $GERRIT_CI/plugin-kafka-events-bazel-$GERRIT_BRANCH/$LAST_BUILD/kafka-events/kafka-events.jar \
   -O $DEPLOYMENT_LOCATION/kafka-events.jar || { echo >&2 "Cannot download kafka-events plugin: Check internet connection. Abort\
 ing"; exit 1; }
+fi
+
+if [ "$BROKER_TYPE" = "kinesis" ]; then
+echo "Downloading kinesis-events plugin master (TODO: replace with $GERRIT_BRANCH, once we have build)"
+  wget $GERRIT_CI/plugin-kinesis-events-gh-bazel-master/$LAST_BUILD/kinesis-events/kinesis-events.jar \
+  -O $DEPLOYMENT_LOCATION/kinesis-events.jar || { echo >&2 "Cannot download kinesis-events plugin: Check internet connection. Abort\
+ing"; exit 1; }
+fi
 
 echo "Downloading metrics-reporter-prometheus plugin $GERRIT_BRANCH"
   wget $GERRIT_CI/plugin-metrics-reporter-prometheus-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/metrics-reporter-prometheus/metrics-reporter-prometheus.jar \
@@ -422,8 +494,8 @@
   echo "Copy events broker library"
   cp -f $DEPLOYMENT_LOCATION/events-broker.jar $LOCATION_TEST_SITE_1/lib/events-broker.jar
 
-  echo "Copy kafka events plugin"
-  cp -f $DEPLOYMENT_LOCATION/kafka-events.jar $LOCATION_TEST_SITE_1/plugins/kafka-events.jar
+  echo "Copy $BROKER_TYPE events plugin"
+  cp -f $DEPLOYMENT_LOCATION/$BROKER_TYPE-events.jar $LOCATION_TEST_SITE_1/plugins/$BROKER_TYPE-events.jar
 
   echo "Copy metrics-reporter-prometheus plugin"
   cp -f $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar $LOCATION_TEST_SITE_1/plugins/metrics-reporter-prometheus.jar
@@ -455,14 +527,10 @@
 
 cat $SCRIPT_DIR/configs/prometheus.yml | envsubst > $COMMON_LOCATION/prometheus.yml
 
-IS_KAFKA_RUNNING=$(check_if_kafka_is_running)
-if [ $IS_KAFKA_RUNNING -lt 1 ];then
-
-  echo "Starting zk and kafka"
-  docker-compose -f $SCRIPT_DIR/docker-compose.yaml up -d
-  echo "Waiting for kafka to start..."
-  while [[ $(check_if_kafka_is_running) -lt 1 ]];do sleep 10s; done
-fi
+export_broker_port
+ensure_docker_compose_is_up_and_running "core" "prometheus_test_node" "docker-compose-core.yaml"
+ensure_docker_compose_is_up_and_running "$BROKER_TYPE" "${BROKER_TYPE}_test_node" "docker-compose-$BROKER_TYPE.yaml"
+prepare_broker_data
 
 echo "Re-deploying configuration files"
 deploy_config_files $GERRIT_1_HOSTNAME $GERRIT_1_HTTPD_PORT $GERRIT_1_SSHD_PORT $GERRIT_2_HOSTNAME $GERRIT_2_HTTPD_PORT $GERRIT_2_SSHD_PORT