blob: eb949689b93bd4014802886fcaf16cd2e99b0b63 [file] [log] [blame]
# pylint: disable=W0613
# Copyright (C) 2022 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import warnings
from kubernetes import client, config
import pytest
from .helm.client import HelmClient
class Cluster:
def __init__(self, kube_config):
self.kube_config = kube_config
self.image_pull_secrets = []
self.namespaces = []
context = self._load_kube_config()
self.helm = HelmClient(self.kube_config, context)
def _load_kube_config(self):
config.load_kube_config(config_file=self.kube_config)
_, context = config.list_kube_config_contexts(config_file=self.kube_config)
return context["name"]
def _apply_image_pull_secrets(self, namespace):
for ips in self.image_pull_secrets:
try:
client.CoreV1Api().create_namespaced_secret(namespace, ips)
except client.rest.ApiException as exc:
if exc.status == 409 and exc.reason == "Conflict":
warnings.warn(
"Kubernetes Cluster not empty. Image pull secret already exists."
)
else:
raise exc
def add_container_registry(self, secret_name, url, user, pwd):
data = {
"auths": {
url: {
"auth": base64.b64encode(str.encode(f"{user}:{pwd}")).decode(
"utf-8"
)
}
}
}
metadata = client.V1ObjectMeta(name=secret_name)
self.image_pull_secrets.append(
client.V1Secret(
api_version="v1",
kind="Secret",
metadata=metadata,
type="kubernetes.io/dockerconfigjson",
data={
".dockerconfigjson": base64.b64encode(
json.dumps(data).encode()
).decode("utf-8")
},
)
)
def create_namespace(self, name):
namespace_metadata = client.V1ObjectMeta(name=name)
namespace_body = client.V1Namespace(
kind="Namespace", api_version="v1", metadata=namespace_metadata
)
client.CoreV1Api().create_namespace(body=namespace_body)
self.namespaces.append(name)
self._apply_image_pull_secrets(name)
def delete_namespace(self, name):
if name not in self.namespaces:
return
client.CoreV1Api().delete_namespace(name, body=client.V1DeleteOptions())
self.namespaces.remove(name)
def cleanup(self):
while self.namespaces:
self.helm.delete_all(
namespace=self.namespaces[0],
)
self.delete_namespace(self.namespaces[0])
@pytest.fixture(scope="session")
def test_cluster(request):
kube_config = request.config.getoption("--kubeconfig")
test_cluster = Cluster(kube_config)
test_cluster.add_container_registry(
"image-pull-secret",
request.config.getoption("--registry"),
request.config.getoption("--registry-user"),
request.config.getoption("--registry-pwd"),
)
yield test_cluster
test_cluster.cleanup()
@pytest.fixture(scope="session")
def ldap_credentials(test_cluster):
ldap_secret = client.CoreV1Api().read_namespaced_secret(
"openldap-users", namespace="openldap"
)
users = base64.b64decode(ldap_secret.data["users"]).decode("utf-8").split(",")
passwords = (
base64.b64decode(ldap_secret.data["passwords"]).decode("utf-8").split(",")
)
credentials = {}
for i, user in enumerate(users):
credentials[user] = passwords[i]
yield credentials
@pytest.fixture(scope="session")
def ldap_admin_credentials(test_cluster):
ldap_secret = client.CoreV1Api().read_namespaced_secret(
"openldap-admin", namespace="openldap"
)
password = base64.b64decode(ldap_secret.data["adminpassword"]).decode("utf-8")
yield ("admin", password)