diff --git a/python-clusters/create-eks-cluster/cluster.json b/python-clusters/create-eks-cluster/cluster.json index beed39e..fa139a8 100644 --- a/python-clusters/create-eks-cluster/cluster.json +++ b/python-clusters/create-eks-cluster/cluster.json @@ -106,6 +106,13 @@ "description": "If registry.k8s.io isn't reachable", "defaultValue" : "registry.k8s.io" }, + { + "name": "autoscalerImageTagOverride", + "label": "Autoscaler image tag override", + "type": "STRING", + "description": "For airgapped registries or registries that block tag listing. Leave empty to discover a public-style version tag.", + "mandatory": false + }, { "name": "advanced", "label": "Use Advanced Configuration", diff --git a/python-clusters/create-eks-cluster/cluster.py b/python-clusters/create-eks-cluster/cluster.py index 029d670..ad8e54f 100644 --- a/python-clusters/create-eks-cluster/cluster.py +++ b/python-clusters/create-eks-cluster/cluster.py @@ -295,9 +295,7 @@ def add_vm_to_sg(): logging.info("At least one node group is autoscaling, ensuring autoscaler") autoscaled_taints = list(autoscaled_node_pools_taints) if autoscaled_node_pools_taints else [] autoscaler_registry_url = self.config.get("autoscalerRegistryURL", "registry.k8s.io") - add_autoscaler_if_needed( - self.cluster_id, self.config, cluster_info, kube_config_path, autoscaled_taints, autoscaler_registry_url - ) + add_autoscaler_if_needed(self.cluster_id, self.config, cluster_info, kube_config_path, autoscaled_taints, autoscaler_registry_url) with open(kube_config_path, "r") as f: kube_config = yaml.safe_load(f) diff --git a/python-lib/dku_kube/autoscaler.py b/python-lib/dku_kube/autoscaler.py index 096ca63..2070372 100644 --- a/python-lib/dku_kube/autoscaler.py +++ b/python-lib/dku_kube/autoscaler.py @@ -1,19 +1,39 @@ import os import json import logging +import re +import urllib.parse +import requests import yaml from .kubectl_command import run_with_timeout from dku_utils.access import _is_none_or_blank -from dku_utils.tools_version import strip_kubernetes_version +from dku_utils.tools_version import parse_kubernetes_version, strip_kubernetes_version from dku_utils.taints import Toleration +AUTOSCALER_IMAGE_REPOSITORY = "autoscaling/cluster-autoscaler" +AUTOSCALER_TAG_RE = re.compile(r"^v([0-9]+)\.([0-9]+)\.([0-9]+)$") +# Upstream Cluster Autoscaler compatibility guidance says autoscaler and +# Kubernetes major/minor versions match from Kubernetes 1.12 onward. +AUTOSCALER_MATCHING_VERSION_CUTOFF = (1, 12) +AUTOSCALER_MATCHING_VERSION_CUTOFF_MINOR = "1.12" + +# Used only when registry tag discovery is unavailable, which preserves the +# existing custom-registry workflow for registries that do not expose tags/list. +# Keep values pinned to tags known to exist in registry.k8s.io. # fmt: off -AUTOSCALER_IMAGES = { - "1.24": "v1.24.3", - "1.25": "v1.25.3", - "1.26": "v1.26.4", - "1.27": "v1.27.3", - "1.28": "v1.28.0" +AUTOSCALER_IMAGE_FALLBACKS = { + "1.24": "v1.24.3", + "1.25": "v1.25.3", + "1.26": "v1.26.4", + "1.27": "v1.27.3", + "1.28": "v1.28.0", + "1.29": "v1.29.5", + "1.30": "v1.30.7", + "1.31": "v1.31.5", + "1.32": "v1.32.7", + "1.33": "v1.33.4", + "1.34": "v1.34.3", + "1.35": "v1.35.0", } # fmt: on @@ -27,22 +47,147 @@ def has_autoscaler(kube_config_path): return len(out.strip()) > 0 +def _parse_autoscaler_tag(tag): + match = AUTOSCALER_TAG_RE.match(tag) + if match is None: + return None + return tuple(int(part) for part in match.groups()) + + +def _get_registry_tags_url(autoscaler_registry_url): + registry_url = autoscaler_registry_url.rstrip("/") + parsed_registry_url = urllib.parse.urlparse(registry_url) + if parsed_registry_url.scheme == "": + parsed_registry_url = urllib.parse.urlparse("https://%s" % registry_url) + + repository_path = "/".join(path_part.strip("/") for path_part in [parsed_registry_url.path, AUTOSCALER_IMAGE_REPOSITORY] if path_part.strip("/")) + return "%s://%s/v2/%s/tags/list" % (parsed_registry_url.scheme, parsed_registry_url.netloc, repository_path) + + +def _discover_published_autoscaler_tags(autoscaler_registry_url): + tags_url = _get_registry_tags_url(autoscaler_registry_url) + logging.info("Retrieving published cluster autoscaler image tags from %s" % tags_url) + + response = requests.get(tags_url, headers={"Accept": "application/json", "User-Agent": "DSS EKS Plugin"}, timeout=10) + if not response.ok: + logging.warning( + "Retrieving the cluster autoscaler image tags from URL '%s' failed with status: %s %s" % (tags_url, response.status_code, response.reason) + ) + logging.warning("Content of failed request: %s" % response.content) + response.raise_for_status() + + tags_response = response.json() + tags = tags_response.get("tags", []) + return [tag for tag in tags if _parse_autoscaler_tag(tag) is not None] + + +def _select_matching_autoscaler_tag_from_tags(tags, parsed_kubernetes_minor): + if parsed_kubernetes_minor >= AUTOSCALER_MATCHING_VERSION_CUTOFF: + # From Kubernetes 1.12 onward, upstream expects the autoscaler major/minor + # to match the Kubernetes major/minor; choose the latest patch for that minor. + matching_minor = parsed_kubernetes_minor + else: + # Before Kubernetes 1.12 there is no same-minor compatibility rule to apply. + # Use the first version where that rule exists, rather than jumping to latest. + matching_minor = AUTOSCALER_MATCHING_VERSION_CUTOFF + + matching_tags = [] + for tag in tags: + parsed_tag = _parse_autoscaler_tag(tag) + if parsed_tag is not None and parsed_tag[:2] == matching_minor: + matching_tags.append((parsed_tag, tag)) + + if matching_tags: + matching_tags.sort() + return matching_tags[-1][1] + + +def select_autoscaler_image(kubernetes_version, autoscaler_registry_url, autoscaler_image_tag_override=None): + kubernetes_major, kubernetes_minor, kubernetes_version_string = parse_kubernetes_version(kubernetes_version) + parsed_kubernetes_minor = (kubernetes_major, kubernetes_minor) + + if not _is_none_or_blank(autoscaler_image_tag_override): + autoscaler_image_tag_override = autoscaler_image_tag_override.strip() + logging.info( + "Using configured cluster autoscaler image tag override %s for Kubernetes %s" % (autoscaler_image_tag_override, kubernetes_version_string) + ) + return autoscaler_image_tag_override + + try: + published_tags = _discover_published_autoscaler_tags(autoscaler_registry_url) + selected_tag = _select_matching_autoscaler_tag_from_tags(published_tags, parsed_kubernetes_minor) + + if selected_tag is None: + logging.warning( + "No published cluster autoscaler image tag matches Kubernetes %s in registry %s. Fallback will be used." + % ( + kubernetes_version_string, + autoscaler_registry_url, + ) + ) + elif parsed_kubernetes_minor < AUTOSCALER_MATCHING_VERSION_CUTOFF: + logging.warning( + "Kubernetes %s is below the cluster autoscaler version-matching cutoff %s. Using tag %s from registry %s." + % ( + kubernetes_version_string, + AUTOSCALER_MATCHING_VERSION_CUTOFF_MINOR, + selected_tag, + autoscaler_registry_url, + ) + ) + return selected_tag + else: + logging.info("Using cluster autoscaler image tag %s for Kubernetes %s" % (selected_tag, kubernetes_version_string)) + return selected_tag + except (requests.RequestException, ValueError, KeyError) as e: + logging.warning( + "Unable to retrieve published cluster autoscaler image tags from registry %s. Fallback will be used.\n %s." % (autoscaler_registry_url, e) + ) + + fallback_tags = list(AUTOSCALER_IMAGE_FALLBACKS.values()) + fallback_tag = _select_matching_autoscaler_tag_from_tags(fallback_tags, parsed_kubernetes_minor) + if fallback_tag is not None: + if parsed_kubernetes_minor < AUTOSCALER_MATCHING_VERSION_CUTOFF: + logging.warning( + "Kubernetes %s is below the cluster autoscaler version-matching cutoff %s. Using bundled fallback tag %s." + % ( + kubernetes_version_string, + AUTOSCALER_MATCHING_VERSION_CUTOFF_MINOR, + fallback_tag, + ) + ) + else: + logging.info("Using bundled fallback tag %s for Kubernetes %s." % (fallback_tag, kubernetes_version_string)) + return fallback_tag + + latest_supported_version = sorted(AUTOSCALER_IMAGE_FALLBACKS.keys(), key=lambda version: tuple(int(part) for part in version.split(".")))[-1] + latest_fallback_tag = AUTOSCALER_IMAGE_FALLBACKS[latest_supported_version] + logging.info( + "No bundled fallback matches Kubernetes %s. Using latest bundled fallback tag %s instead." + % ( + kubernetes_version_string, + latest_fallback_tag, + ) + ) + return latest_fallback_tag + + def add_autoscaler_if_needed(cluster_id, cluster_config, cluster_def, kube_config_path, taints, autoscaler_registry_url): if not has_autoscaler(kube_config_path): kubernetes_version = cluster_config.get("k8sVersion", None) - if _is_none_or_blank(kubernetes_version): + if _is_none_or_blank(kubernetes_version) or kubernetes_version.strip().lower() == "latest": kubernetes_version = cluster_def.get("Version") + if _is_none_or_blank(kubernetes_version): + raise Exception("No Kubernetes version found in cluster config or EKS cluster definition") kubernetes_version = strip_kubernetes_version(kubernetes_version) autoscaler_file_path = "autoscaler.yaml" - if float(kubernetes_version) < 1.24: - autoscaler_image = AUTOSCALER_IMAGES.get("1.24", "v1.24.3") - else: - autoscaler_image = AUTOSCALER_IMAGES.get(kubernetes_version, "v1.28.0") + autoscaler_image_tag_override = cluster_config.get("autoscalerImageTagOverride", None) + autoscaler_image_tag = select_autoscaler_image(kubernetes_version, autoscaler_registry_url, autoscaler_image_tag_override) autoscaler_full_config = list(yaml.safe_load_all(get_autoscaler_roles())) - autoscaler_config = yaml.safe_load(get_autoscaler_config(cluster_id, autoscaler_image, autoscaler_registry_url)) + autoscaler_config = yaml.safe_load(get_autoscaler_config(cluster_id, autoscaler_image_tag, autoscaler_registry_url)) tolerations = set() # If there are any taints to patch the autoscaler with in the node group(s) to create, @@ -240,4 +385,3 @@ def get_autoscaler_config(cluster_id, autoscaler_image_version, autoscaler_regis hostPath: path: "/etc/ssl/certs/ca-bundle.crt" """ % {"autoscalerimageversion": autoscaler_image_version, "clusterid": cluster_id, "autoscalerregistryurl": autoscaler_registry_url} - diff --git a/python-lib/dku_utils/tools_version.py b/python-lib/dku_utils/tools_version.py index fd7f78f..4b939f7 100644 --- a/python-lib/dku_utils/tools_version.py +++ b/python-lib/dku_utils/tools_version.py @@ -2,6 +2,8 @@ import re from dku_kube.kubectl_command import run_with_timeout +KUBERNETES_VERSION_RE = re.compile(r"^([0-9]+)\.([0-9]+)(?:\.([0-9]+))?$") + def get_kubectl_version(): cmd = ["kubectl", "version", "--client", "-o", "json"] @@ -23,29 +25,43 @@ def get_kubectl_version_int(kubectl_version): Extracts the integers representing the major version and the minor version coming from outcome of `kubectl version` command """ - # the kubectl version downloaded from Amazon website has a minor version finishing by '+' - # keeping only the first numeric sequence for the minor version if "major" not in kubectl_version or "minor" not in kubectl_version: raise Exception("Kubectl version found on the machine: %s. It is not correctly formatted" % kubectl_version_to_string(kubectl_version)) - regex_minor_int = re.compile("^[^0-9]*([0-9]+)([^0-9].*$|$)") - search_results_minor_int = re.search(regex_minor_int, kubectl_version["minor"]) - if not search_results_minor_int or not search_results_minor_int.groups(): + + # The kubectl version downloaded from Amazon can have a minor version ending with '+'; + # normalize it before using the generic Kubernetes major/minor parser. + normalized_version = strip_kubernetes_version(kubectl_version_to_string(kubectl_version)) + try: + major_int, minor_int, _ = parse_kubernetes_version(normalized_version) + except Exception: raise Exception("Kubectl version found on the machine: %s. It was not possible to parse" % kubectl_version_to_string(kubectl_version)) - minor_int = int(search_results_minor_int.groups()[0]) - return int(kubectl_version["major"]), minor_int + return major_int, minor_int def strip_kubernetes_version(k8s_version_input): """ Removes any additional characters from the Kubernetes version specified in the cluster creation form """ - regex_k8s_version = re.compile("^[^0-9]*([0-9]+\.?[0-9]+)([^0-9].*$|$)") + regex_k8s_version = re.compile(r"^[^0-9]*([0-9]+\.?[0-9]+)([^0-9].*$|$)") search_results_k8s_version = re.search(regex_k8s_version, k8s_version_input) if not search_results_k8s_version or not search_results_k8s_version.groups(): raise Exception("Kubectl version specified: %s. No valid Kubernetes version found", k8s_version_input) return search_results_k8s_version.groups()[0] +def parse_kubernetes_version(version): + """ + Parses a Kubernetes version string and returns its major, minor, and major.minor string. + EKS cluster versions are usually major.minor, but a patch version is accepted too. + """ + match = KUBERNETES_VERSION_RE.match(version) + if match is None: + raise Exception("Kubernetes version specified: %s. No valid Kubernetes major/minor version found" % version) + major = int(match.group(1)) + minor = int(match.group(2)) + return major, minor, "%s.%s" % (major, minor) + + def get_authenticator_version(): cmd = ["aws-iam-authenticator", "version", "-o", "json"] out, err = run_with_timeout(cmd, timeout=30) diff --git a/python-runnables/add-autoscaler/runnable.json b/python-runnables/add-autoscaler/runnable.json index 3109d74..9cc1fcc 100644 --- a/python-runnables/add-autoscaler/runnable.json +++ b/python-runnables/add-autoscaler/runnable.json @@ -32,6 +32,13 @@ "type": "STRING", "description": "If registry.k8s.io isn't reachable", "defaultValue" : "registry.k8s.io" + }, + { + "name": "autoscalerImageTagOverride", + "label": "Autoscaler image tag override", + "type": "STRING", + "description": "For airgapped registries or registries that block tag listing. Leave empty to discover a public-style version tag.", + "mandatory": false } ] } diff --git a/python-runnables/add-autoscaler/runnable.py b/python-runnables/add-autoscaler/runnable.py index 3dd260c..511d0b6 100644 --- a/python-runnables/add-autoscaler/runnable.py +++ b/python-runnables/add-autoscaler/runnable.py @@ -31,4 +31,4 @@ def run(self, progress_callback): else: autoscaler_registry_url = self.config.get("autoscalerRegistryURL", "registry.k8s.io") add_autoscaler_if_needed(cluster_id, self.config, cluster_def, kube_config_path, [], autoscaler_registry_url) - return "
Created an autoscaler pod
" + return "
Created an autoscaler pod
" diff --git a/python-runnables/add-node-pool/runnable.json b/python-runnables/add-node-pool/runnable.json index 8d79f7b..5010b1c 100644 --- a/python-runnables/add-node-pool/runnable.json +++ b/python-runnables/add-node-pool/runnable.json @@ -46,6 +46,13 @@ "type": "STRING", "description": "If registry.k8s.io isn't reachable", "defaultValue" : "registry.k8s.io" + }, + { + "name": "autoscalerImageTagOverride", + "label": "Autoscaler image tag override", + "type": "STRING", + "description": "For airgapped registries or registries that block tag listing. Leave empty to discover a public-style version tag.", + "mandatory": false } ] } diff --git a/python-runnables/add-node-pool/runnable.py b/python-runnables/add-node-pool/runnable.py index 9f8288d..539c49f 100644 --- a/python-runnables/add-node-pool/runnable.py +++ b/python-runnables/add-node-pool/runnable.py @@ -108,7 +108,9 @@ def run(self, progress_callback): if node_pool.get("numNodesAutoscaling", False): logging.info("Nodegroup is autoscaling, ensuring autoscaler") autoscaler_registry_url = self.config.get("autoscalerRegistryURL", "registry.k8s.io") - add_autoscaler_if_needed(cluster_id, self.config, cluster_data.get("cluster"), kube_config_path, node_group_taints, autoscaler_registry_url) + add_autoscaler_if_needed( + cluster_id, self.config, cluster_data.get("cluster"), kube_config_path, node_group_taints, autoscaler_registry_url + ) if node_pool.get("enableGPU", False): logging.info("Nodegroup is GPU-enabled, ensuring NVIDIA GPU Drivers")