blob: eaf6ee18bd999253df1e037287b231307b472c3d [file] [log] [blame]
# pylint: disable=W0613
# Copyright (C) 2019 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from argparse import ArgumentTypeError
import base64
import json
import re
import subprocess
import warnings
from kubernetes import client, config
import pytest
from helm import Helm
HELM_SERVICE_ACCOUNT_NAME = "helm"
HELM_SERVICE_ACCOUNT_NAMESPACE = "kube-system"
class AbstractStorageProvisioner(ABC):
def __init__(self, name):
self.name = name
@abstractmethod
def deploy(self):
"""
Deploy provisioner on cluster
"""
@abstractmethod
def delete(self):
"""
Delete provisioner from cluster
"""
class EFSProvisioner(AbstractStorageProvisioner):
def __init__(self, efs_id, efs_region, chart_name="efs"):
super().__init__(chart_name)
self.efs_id = efs_id
self.efs_region = efs_region
self.helm = None
def set_helm_connector(self, helm):
self.helm = helm
def deploy(self):
chart_opts = {
"efsProvisioner.efsFileSystemId": self.efs_id,
"efsProvisioner.awsRegion": self.efs_region,
"efsProvisioner.storageClass.name": "shared-storage",
}
res = self.helm.install(
"stable/efs-provisioner",
self.name,
set_values=chart_opts,
fail_on_err=False,
)
if res.returncode == 0:
return
if re.match(r"Error: cannot re-use a name that is still in use", res.stderr):
warnings.warn(
"Kubernetes Cluster not empty. EFS provisioner already exists."
)
else:
print(res.stderr)
raise subprocess.CalledProcessError(
res.returncode, res.args, output=res.stdout, stderr=res.stderr
)
def delete(self):
try:
self.helm.delete(self.name)
except subprocess.CalledProcessError as exc:
print("deletion of EFS-provisioner failed: ", exc)
class TestCluster:
def __init__(self, kube_config, storage_provisioner, registry):
self.kube_config = kube_config
self.registry = registry
self.storage_provisioner = storage_provisioner
self.current_context = None
self.helm = None
self.namespaces = list()
def _load_kube_config(self):
config.load_kube_config(config_file=self.kube_config)
_, context = config.list_kube_config_contexts(config_file=self.kube_config)
self.current_context = context["name"]
def create_image_pull_secret(self, namespace="default"):
secret_metadata = client.V1ObjectMeta(name="image-pull-secret")
auth_string = str.encode(
"%s:%s" % (self.registry["user"], self.registry["pwd"])
)
secret_data = {
"auths": {
self.registry["url"]: {
"auth": base64.b64encode(auth_string).decode("utf-8")
}
}
}
secret_data = json.dumps(secret_data).encode()
secret_body = client.V1Secret(
api_version="v1",
kind="Secret",
metadata=secret_metadata,
type="kubernetes.io/dockerconfigjson",
data={".dockerconfigjson": base64.b64encode(secret_data).decode("utf-8")},
)
core_v1 = client.CoreV1Api()
try:
core_v1.create_namespaced_secret(namespace, secret_body)
except client.rest.ApiException as exc:
if exc.status == 409 and exc.reason == "Conflict":
warnings.warn(
"Kubernetes Cluster not empty. Image pull secret already exists."
)
else:
raise exc
def create_namespace(self, name):
namespace_metadata = client.V1ObjectMeta(name=name)
namespace_body = client.V1Namespace(
kind="Namespace", api_version="v1", metadata=namespace_metadata
)
core_v1 = client.CoreV1Api()
core_v1.create_namespace(body=namespace_body)
self.namespaces.append(name)
self.create_image_pull_secret(name)
def delete_namespace(self, name):
core_v1 = client.CoreV1Api()
core_v1.delete_namespace(name, body=client.V1DeleteOptions())
self.namespaces.remove(name)
def install_storage_provisioner(self):
self.storage_provisioner.set_helm_connector(self.helm)
self.storage_provisioner.deploy()
def setup(self):
self._load_kube_config()
self.create_image_pull_secret()
self.helm = Helm(self.kube_config, self.current_context)
self.install_storage_provisioner()
def cleanup(self):
while self.namespaces:
self.helm.delete_all(
namespace=self.namespaces[0], exceptions=[self.storage_provisioner.name]
)
self.delete_namespace(self.namespaces[0])
self.storage_provisioner.delete()
core_v1 = client.CoreV1Api()
core_v1.delete_namespaced_secret(
"image-pull-secret", "default", body=client.V1DeleteOptions()
)
@pytest.fixture(scope="session")
def test_cluster(request):
kube_config = request.config.getoption("--kubeconfig")
infra_provider = request.config.getoption("--infra-provider").lower()
if infra_provider == "aws":
efs_id = request.config.getoption("--efs-id")
if not efs_id:
raise ArgumentTypeError("No EFS-ID was provided.")
efs_region = request.config.getoption("--efs-region")
if not efs_region:
raise ArgumentTypeError("No EFS-region was provided.")
storage_provisioner = EFSProvisioner(efs_id, efs_region)
registry = {
"url": request.config.getoption("--registry"),
"user": request.config.getoption("--registry-user"),
"pwd": request.config.getoption("--registry-pwd"),
}
test_cluster = TestCluster(kube_config, storage_provisioner, registry)
test_cluster.setup()
yield test_cluster
test_cluster.cleanup()