Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions python-clusters/create-eks-cluster/cluster.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@
"description": "If registry.k8s.io isn't reachable",
"defaultValue" : "registry.k8s.io"
},
{
"name": "autoscalerImageTagOverride",
"label": "Autoscaler image tag override",
"type": "STRING",
"description": "For airgapped registries or registries that block tag listing. Leave empty to discover a public-style version tag.",
"mandatory": false
},
{
"name": "advanced",
"label": "Use Advanced Configuration",
Expand Down
4 changes: 1 addition & 3 deletions python-clusters/create-eks-cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,7 @@ def add_vm_to_sg():
logging.info("At least one node group is autoscaling, ensuring autoscaler")
autoscaled_taints = list(autoscaled_node_pools_taints) if autoscaled_node_pools_taints else []
autoscaler_registry_url = self.config.get("autoscalerRegistryURL", "registry.k8s.io")
add_autoscaler_if_needed(
self.cluster_id, self.config, cluster_info, kube_config_path, autoscaled_taints, autoscaler_registry_url
)
add_autoscaler_if_needed(self.cluster_id, self.config, cluster_info, kube_config_path, autoscaled_taints, autoscaler_registry_url)

with open(kube_config_path, "r") as f:
kube_config = yaml.safe_load(f)
Expand Down
172 changes: 158 additions & 14 deletions python-lib/dku_kube/autoscaler.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,39 @@
import os
import json
import logging
import re
import urllib.parse
import requests
import yaml
from .kubectl_command import run_with_timeout
from dku_utils.access import _is_none_or_blank
from dku_utils.tools_version import strip_kubernetes_version
from dku_utils.tools_version import parse_kubernetes_version, strip_kubernetes_version
from dku_utils.taints import Toleration

AUTOSCALER_IMAGE_REPOSITORY = "autoscaling/cluster-autoscaler"
AUTOSCALER_TAG_RE = re.compile(r"^v([0-9]+)\.([0-9]+)\.([0-9]+)$")
# Upstream Cluster Autoscaler compatibility guidance says autoscaler and
# Kubernetes major/minor versions match from Kubernetes 1.12 onward.
AUTOSCALER_MATCHING_VERSION_CUTOFF = (1, 12)
AUTOSCALER_MATCHING_VERSION_CUTOFF_MINOR = "1.12"

# Used only when registry tag discovery is unavailable, which preserves the
# existing custom-registry workflow for registries that do not expose tags/list.
# Keep values pinned to tags known to exist in registry.k8s.io.
# fmt: off
AUTOSCALER_IMAGES = {
"1.24": "v1.24.3",
"1.25": "v1.25.3",
"1.26": "v1.26.4",
"1.27": "v1.27.3",
"1.28": "v1.28.0"
AUTOSCALER_IMAGE_FALLBACKS = {
"1.24": "v1.24.3",
"1.25": "v1.25.3",
"1.26": "v1.26.4",
"1.27": "v1.27.3",
"1.28": "v1.28.0",
"1.29": "v1.29.5",
"1.30": "v1.30.7",
"1.31": "v1.31.5",
"1.32": "v1.32.7",
"1.33": "v1.33.4",
"1.34": "v1.34.3",
"1.35": "v1.35.0",
}
# fmt: on

Expand All @@ -27,22 +47,147 @@ def has_autoscaler(kube_config_path):
return len(out.strip()) > 0


def _parse_autoscaler_tag(tag):
match = AUTOSCALER_TAG_RE.match(tag)
if match is None:
return None
return tuple(int(part) for part in match.groups())


def _get_registry_tags_url(autoscaler_registry_url):
registry_url = autoscaler_registry_url.rstrip("/")
parsed_registry_url = urllib.parse.urlparse(registry_url)
if parsed_registry_url.scheme == "":
parsed_registry_url = urllib.parse.urlparse("https://%s" % registry_url)

repository_path = "/".join(path_part.strip("/") for path_part in [parsed_registry_url.path, AUTOSCALER_IMAGE_REPOSITORY] if path_part.strip("/"))
return "%s://%s/v2/%s/tags/list" % (parsed_registry_url.scheme, parsed_registry_url.netloc, repository_path)


def _discover_published_autoscaler_tags(autoscaler_registry_url):
tags_url = _get_registry_tags_url(autoscaler_registry_url)
logging.info("Retrieving published cluster autoscaler image tags from %s" % tags_url)

response = requests.get(tags_url, headers={"Accept": "application/json", "User-Agent": "DSS EKS Plugin"}, timeout=10)
if not response.ok:
logging.warning(
"Retrieving the cluster autoscaler image tags from URL '%s' failed with status: %s %s" % (tags_url, response.status_code, response.reason)
)
logging.warning("Content of failed request: %s" % response.content)
response.raise_for_status()

tags_response = response.json()
tags = tags_response.get("tags", [])
return [tag for tag in tags if _parse_autoscaler_tag(tag) is not None]


def _select_matching_autoscaler_tag_from_tags(tags, parsed_kubernetes_minor):
if parsed_kubernetes_minor >= AUTOSCALER_MATCHING_VERSION_CUTOFF:
# From Kubernetes 1.12 onward, upstream expects the autoscaler major/minor
# to match the Kubernetes major/minor; choose the latest patch for that minor.
matching_minor = parsed_kubernetes_minor
else:
# Before Kubernetes 1.12 there is no same-minor compatibility rule to apply.
# Use the first version where that rule exists, rather than jumping to latest.
matching_minor = AUTOSCALER_MATCHING_VERSION_CUTOFF

matching_tags = []
for tag in tags:
parsed_tag = _parse_autoscaler_tag(tag)
if parsed_tag is not None and parsed_tag[:2] == matching_minor:
matching_tags.append((parsed_tag, tag))

if matching_tags:
matching_tags.sort()
return matching_tags[-1][1]


def select_autoscaler_image(kubernetes_version, autoscaler_registry_url, autoscaler_image_tag_override=None):
kubernetes_major, kubernetes_minor, kubernetes_version_string = parse_kubernetes_version(kubernetes_version)
parsed_kubernetes_minor = (kubernetes_major, kubernetes_minor)

if not _is_none_or_blank(autoscaler_image_tag_override):
autoscaler_image_tag_override = autoscaler_image_tag_override.strip()
logging.info(
"Using configured cluster autoscaler image tag override %s for Kubernetes %s" % (autoscaler_image_tag_override, kubernetes_version_string)
)
return autoscaler_image_tag_override

try:
published_tags = _discover_published_autoscaler_tags(autoscaler_registry_url)
selected_tag = _select_matching_autoscaler_tag_from_tags(published_tags, parsed_kubernetes_minor)

if selected_tag is None:
logging.warning(
"No published cluster autoscaler image tag matches Kubernetes %s in registry %s. Fallback will be used."
% (
kubernetes_version_string,
autoscaler_registry_url,
)
)
elif parsed_kubernetes_minor < AUTOSCALER_MATCHING_VERSION_CUTOFF:
logging.warning(
"Kubernetes %s is below the cluster autoscaler version-matching cutoff %s. Using tag %s from registry %s."
% (
kubernetes_version_string,
AUTOSCALER_MATCHING_VERSION_CUTOFF_MINOR,
selected_tag,
autoscaler_registry_url,
)
)
return selected_tag
else:
logging.info("Using cluster autoscaler image tag %s for Kubernetes %s" % (selected_tag, kubernetes_version_string))
return selected_tag
except (requests.RequestException, ValueError, KeyError) as e:
logging.warning(
"Unable to retrieve published cluster autoscaler image tags from registry %s. Fallback will be used.\n %s." % (autoscaler_registry_url, e)
)

fallback_tags = list(AUTOSCALER_IMAGE_FALLBACKS.values())
fallback_tag = _select_matching_autoscaler_tag_from_tags(fallback_tags, parsed_kubernetes_minor)
if fallback_tag is not None:
if parsed_kubernetes_minor < AUTOSCALER_MATCHING_VERSION_CUTOFF:
logging.warning(
"Kubernetes %s is below the cluster autoscaler version-matching cutoff %s. Using bundled fallback tag %s."
% (
kubernetes_version_string,
AUTOSCALER_MATCHING_VERSION_CUTOFF_MINOR,
fallback_tag,
)
)
else:
logging.info("Using bundled fallback tag %s for Kubernetes %s." % (fallback_tag, kubernetes_version_string))
return fallback_tag

latest_supported_version = sorted(AUTOSCALER_IMAGE_FALLBACKS.keys(), key=lambda version: tuple(int(part) for part in version.split(".")))[-1]
latest_fallback_tag = AUTOSCALER_IMAGE_FALLBACKS[latest_supported_version]
logging.info(
"No bundled fallback matches Kubernetes %s. Using latest bundled fallback tag %s instead."
% (
kubernetes_version_string,
latest_fallback_tag,
)
)
return latest_fallback_tag


def add_autoscaler_if_needed(cluster_id, cluster_config, cluster_def, kube_config_path, taints, autoscaler_registry_url):
if not has_autoscaler(kube_config_path):
kubernetes_version = cluster_config.get("k8sVersion", None)
if _is_none_or_blank(kubernetes_version):
if _is_none_or_blank(kubernetes_version) or kubernetes_version.strip().lower() == "latest":
kubernetes_version = cluster_def.get("Version")
if _is_none_or_blank(kubernetes_version):
raise Exception("No Kubernetes version found in cluster config or EKS cluster definition")

kubernetes_version = strip_kubernetes_version(kubernetes_version)
autoscaler_file_path = "autoscaler.yaml"

if float(kubernetes_version) < 1.24:
autoscaler_image = AUTOSCALER_IMAGES.get("1.24", "v1.24.3")
else:
autoscaler_image = AUTOSCALER_IMAGES.get(kubernetes_version, "v1.28.0")
autoscaler_image_tag_override = cluster_config.get("autoscalerImageTagOverride", None)
autoscaler_image_tag = select_autoscaler_image(kubernetes_version, autoscaler_registry_url, autoscaler_image_tag_override)

autoscaler_full_config = list(yaml.safe_load_all(get_autoscaler_roles()))
autoscaler_config = yaml.safe_load(get_autoscaler_config(cluster_id, autoscaler_image, autoscaler_registry_url))
autoscaler_config = yaml.safe_load(get_autoscaler_config(cluster_id, autoscaler_image_tag, autoscaler_registry_url))
tolerations = set()

# If there are any taints to patch the autoscaler with in the node group(s) to create,
Expand Down Expand Up @@ -240,4 +385,3 @@ def get_autoscaler_config(cluster_id, autoscaler_image_version, autoscaler_regis
hostPath:
path: "/etc/ssl/certs/ca-bundle.crt"
""" % {"autoscalerimageversion": autoscaler_image_version, "clusterid": cluster_id, "autoscalerregistryurl": autoscaler_registry_url}

32 changes: 24 additions & 8 deletions python-lib/dku_utils/tools_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import re
from dku_kube.kubectl_command import run_with_timeout

KUBERNETES_VERSION_RE = re.compile(r"^([0-9]+)\.([0-9]+)(?:\.([0-9]+))?$")


def get_kubectl_version():
cmd = ["kubectl", "version", "--client", "-o", "json"]
Expand All @@ -23,29 +25,43 @@ def get_kubectl_version_int(kubectl_version):
Extracts the integers representing the major version and the minor version coming from outcome
of `kubectl version` command
"""
# the kubectl version downloaded from Amazon website has a minor version finishing by '+'
# keeping only the first numeric sequence for the minor version
if "major" not in kubectl_version or "minor" not in kubectl_version:
raise Exception("Kubectl version found on the machine: %s. It is not correctly formatted" % kubectl_version_to_string(kubectl_version))
regex_minor_int = re.compile("^[^0-9]*([0-9]+)([^0-9].*$|$)")
search_results_minor_int = re.search(regex_minor_int, kubectl_version["minor"])
if not search_results_minor_int or not search_results_minor_int.groups():

# The kubectl version downloaded from Amazon can have a minor version ending with '+';
# normalize it before using the generic Kubernetes major/minor parser.
normalized_version = strip_kubernetes_version(kubectl_version_to_string(kubectl_version))
try:
major_int, minor_int, _ = parse_kubernetes_version(normalized_version)
except Exception:
raise Exception("Kubectl version found on the machine: %s. It was not possible to parse" % kubectl_version_to_string(kubectl_version))
minor_int = int(search_results_minor_int.groups()[0])
return int(kubectl_version["major"]), minor_int
return major_int, minor_int


def strip_kubernetes_version(k8s_version_input):
"""
Removes any additional characters from the Kubernetes version specified in the cluster creation form
"""
regex_k8s_version = re.compile("^[^0-9]*([0-9]+\.?[0-9]+)([^0-9].*$|$)")
regex_k8s_version = re.compile(r"^[^0-9]*([0-9]+\.?[0-9]+)([^0-9].*$|$)")
search_results_k8s_version = re.search(regex_k8s_version, k8s_version_input)
if not search_results_k8s_version or not search_results_k8s_version.groups():
raise Exception("Kubectl version specified: %s. No valid Kubernetes version found", k8s_version_input)
return search_results_k8s_version.groups()[0]


def parse_kubernetes_version(version):
"""
Parses a Kubernetes version string and returns its major, minor, and major.minor string.
EKS cluster versions are usually major.minor, but a patch version is accepted too.
"""
match = KUBERNETES_VERSION_RE.match(version)
if match is None:
raise Exception("Kubernetes version specified: %s. No valid Kubernetes major/minor version found" % version)
major = int(match.group(1))
minor = int(match.group(2))
return major, minor, "%s.%s" % (major, minor)


def get_authenticator_version():
cmd = ["aws-iam-authenticator", "version", "-o", "json"]
out, err = run_with_timeout(cmd, timeout=30)
Expand Down
7 changes: 7 additions & 0 deletions python-runnables/add-autoscaler/runnable.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@
"type": "STRING",
"description": "If registry.k8s.io isn't reachable",
"defaultValue" : "registry.k8s.io"
},
{
"name": "autoscalerImageTagOverride",
"label": "Autoscaler image tag override",
"type": "STRING",
"description": "For airgapped registries or registries that block tag listing. Leave empty to discover a public-style version tag.",
"mandatory": false
}
]
}
2 changes: 1 addition & 1 deletion python-runnables/add-autoscaler/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ def run(self, progress_callback):
else:
autoscaler_registry_url = self.config.get("autoscalerRegistryURL", "registry.k8s.io")
add_autoscaler_if_needed(cluster_id, self.config, cluster_def, kube_config_path, [], autoscaler_registry_url)
return "<h5>Created an autoscaler pod<h5>"
return "<h5>Created an autoscaler pod</h5>"
7 changes: 7 additions & 0 deletions python-runnables/add-node-pool/runnable.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@
"type": "STRING",
"description": "If registry.k8s.io isn't reachable",
"defaultValue" : "registry.k8s.io"
},
{
"name": "autoscalerImageTagOverride",
"label": "Autoscaler image tag override",
"type": "STRING",
"description": "For airgapped registries or registries that block tag listing. Leave empty to discover a public-style version tag.",
"mandatory": false
}
]
}
4 changes: 3 additions & 1 deletion python-runnables/add-node-pool/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ def run(self, progress_callback):
if node_pool.get("numNodesAutoscaling", False):
logging.info("Nodegroup is autoscaling, ensuring autoscaler")
autoscaler_registry_url = self.config.get("autoscalerRegistryURL", "registry.k8s.io")
add_autoscaler_if_needed(cluster_id, self.config, cluster_data.get("cluster"), kube_config_path, node_group_taints, autoscaler_registry_url)
add_autoscaler_if_needed(
cluster_id, self.config, cluster_data.get("cluster"), kube_config_path, node_group_taints, autoscaler_registry_url
)

if node_pool.get("enableGPU", False):
logging.info("Nodegroup is GPU-enabled, ensuring NVIDIA GPU Drivers")
Expand Down