Skip to content

Commit d3bc588

Browse files
committed
WIP
1 parent 84dfc46 commit d3bc588

9 files changed

Lines changed: 428 additions & 249 deletions

File tree

README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,32 @@ After the last executor, the `filer` is called once more to process the outputs
5555
and push them to remote locations from the PVC. The PVC is the scrubbed, deleted
5656
and the taskmaster ends, completing the task.
5757

58+
┌─────────────────────────────────────────────────────────┐
59+
│ Kubernetes │
60+
│ │
61+
│ ┌────────────────────────────┐ ┌───────────────────┐ │
62+
│ │ Secret: ftp-secret │ │ ConfigMap/PVC │ │
63+
│ │ - username │ │ - JSON_INPUT.gz │ │
64+
│ │ - password │ │ │ │
65+
│ └──────────▲─────────────────┘ └───────▲───────────┘ │
66+
│ │ | │
67+
│ │ | │
68+
│ │ | │
69+
│ ┌─────────┴────────────────────────────┴────────────┐ │
70+
│ │ Job: taskmaster │ │
71+
│ │ ┌───────────────────────────────────────────────┐ │ │
72+
│ │ │ Pod: taskmaster │ │ │
73+
│ │ │ - Container: taskmaster │ │ │
74+
│ │ │ - Env: TESK_FTP_USERNAME │ │ │
75+
│ │ │ - Env: TESK_FTP_PASSWORD │ │ │
76+
│ │ │ - Args: -f /jsoninput/JSON_INPUT.gz │ │ │
77+
│ │ │ - Mounts: /podinfo │ │ │
78+
│ │ │ /jsoninput │ │ │
79+
│ │ └───────────────────────────────────────────────┘ │ │
80+
│ └───────────────────────────────────────────────────┘ │
81+
│ │
82+
└─────────────────────────────────────────────────────────┘
83+
5884
## Requirements
5985

6086
- A working [Kubernetes](https://kubernetes.io/) cluster version 1.9 and later.

deployment/charts/tesk/values.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ ftp:
7070
# If you need FTP configuration, choose one of the 2 methods of providing credentials
7171
classic_ftp_secret: ftp-secret
7272
netrc_secret:
73-
#classic_ftp_secret:
74-
#netrc_secret: netrc-secret
73+
classic_ftp_secret:
74+
netrc_secret: netrc-secret
7575
# If you install FTP locally, but outside of k8s and need a DNS entry for it (because your workflow manager might not like the IP address)
7676
# one way of getting a DNS entry for your FTP service is to use a k8s "service without a selector"
7777
# Put the IP under which your pods see see services running on your host (differs depending on the way you installs K8s)

poetry.lock

Lines changed: 184 additions & 185 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tesk/api/ga4gh/tes/controllers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from tesk.api.ga4gh.tes.models import TesTask
88
from tesk.api.ga4gh.tes.service_info.service_info import ServiceInfo
9+
from tesk.api.ga4gh.tes.task.create_task import CreateTesTask
910
from tesk.api.kubernetes.converter import TesKubernetesConverter
1011
from tesk.api.kubernetes.template import KubernetesTemplateSupplier
1112
from tesk.exceptions import BadRequest, InternalServerError
@@ -42,6 +43,7 @@ def CreateTask(**kwargs) -> dict: # type: ignore
4243
if request_body is None:
4344
logger("Nothing recieved in request body.")
4445
raise BadRequest("No request body recieved.")
46+
CreateTesTask(TesTask(**request_body)).create_task()
4547
except Exception as e:
4648
raise InternalServerError from e
4749

tesk/api/ga4gh/tes/task/create_task.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ def __init__(self, task: TesTask, namespace=TeskConstants.tesk_namespace):
2121
2222
Args:
2323
task: TES task to create.
24-
user: User who creates the task.
2524
namespace: Kubernetes namespace where the task is created.
2625
"""
2726
self.task = task
@@ -45,10 +44,13 @@ def create_task(self):
4544

4645
task_master_job = (
4746
self.tes_kubernetes_converter.from_tes_task_to_k8s_job(
48-
self.task, self.user
47+
self.task,
48+
# self.user
4949
)
5050
)
5151

52+
print(task_master_job)
53+
5254
# TODO: Create ConfigMap
5355
# TODO: Create Job
5456
# TODO Return created job

tesk/api/kubernetes/converter.py

Lines changed: 106 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
)
1414
from kubernetes.client.models import V1Job
1515

16-
from tesk.api.ga4gh.tes.models import TesTask
16+
from tesk.api.ga4gh.tes.models import TesExecutor, TesResources, TesTask
1717
from tesk.api.kubernetes.constants import Constants, K8sConstants
1818
from tesk.api.kubernetes.template import KubernetesTemplateSupplier
1919
from tesk.constants import TeskConstants
@@ -26,7 +26,6 @@
2626
class TesKubernetesConverter:
2727
def __init__(self, namespace=TeskConstants.tesk_namespace):
2828
"""Initialize the converter."""
29-
self.taskmaster_template: V1Job = get_taskmaster_template()
3029
self.taskmaster_env_properties: TaskmasterEnvProperties = (
3130
get_taskmaster_env_property()
3231
)
@@ -82,54 +81,108 @@ def from_tes_task_to_k8s_job(self, task: TesTask):
8281

8382
return taskmsater_job
8483

85-
# def from_tes_task_to_k8s_config_map(self, task, user, job):
86-
# task_master_config_map = V1ConfigMap(
87-
# metadata=V1ObjectMeta(name=job.metadata.name)
88-
# )
89-
# task_master_config_map.metadata.annotations[ANN_TESTASK_NAME_KEY] = task["name"]
90-
# task_master_config_map.metadata.labels[LABEL_USERID_KEY] = user["username"]
91-
92-
# if "tags" in task and "GROUP_NAME" in task["tags"]:
93-
# task_master_config_map.metadata.labels[LABEL_GROUPNAME_KEY] = task["tags"][
94-
# "GROUP_NAME"
95-
# ]
96-
# elif user["is_member"]:
97-
# task_master_config_map.metadata.labels[LABEL_GROUPNAME_KEY] = user[
98-
# "any_group"
99-
# ]
100-
101-
# executors_as_jobs = [
102-
# self.from_tes_executor_to_k8s_job(
103-
# task_master_config_map.metadata.name,
104-
# task["name"],
105-
# executor,
106-
# idx,
107-
# task["resources"],
108-
# user,
109-
# )
110-
# for idx, executor in enumerate(task["executors"])
111-
# ]
112-
113-
# task_master_input = {
114-
# "inputs": task.get("inputs", []),
115-
# "outputs": task.get("outputs", []),
116-
# "volumes": task.get("volumes", []),
117-
# "resources": {"disk_gb": task["resources"].get("disk_gb", 10.0)},
118-
# }
119-
# task_master_input[TASKMASTER_INPUT_EXEC_KEY] = executors_as_jobs
120-
121-
# task_master_input_as_json = json.dumps(task_master_input)
122-
# try:
123-
# with BytesIO() as obj:
124-
# with gzip.GzipFile(fileobj=obj, mode="wb") as gzip_file:
125-
# gzip_file.write(task_master_input_as_json.encode("utf-8"))
126-
# task_master_config_map.binary_data = {
127-
# f"{TASKMASTER_INPUT}.gz": obj.getvalue()
128-
# }
129-
# except Exception as e:
130-
# logger.info(
131-
# f"Compression of task {task_master_config_map.metadata.name} JSON configmap failed",
132-
# e,
133-
# )
134-
135-
# return task_master_config_map
84+
def from_tes_task_to_k8s_config_map(
85+
self,
86+
task: TesTask,
87+
# user,
88+
job,
89+
):
90+
task_master_config_map = V1ConfigMap(
91+
metadata=V1ObjectMeta(name=job.metadata.name)
92+
)
93+
task_master_config_map.metadata.annotations[
94+
self.constants.ann_testask_name_key
95+
] = task["name"]
96+
# task_master_config_map.metadata.labels[self.constants.label_userid_key] = user["username"]
97+
98+
if "tags" in task and "GROUP_NAME" in task["tags"]:
99+
task_master_config_map.metadata.labels[
100+
self.constants.label_groupname_key
101+
] = task["tags"]["GROUP_NAME"]
102+
# elif user["is_member"]:
103+
# task_master_config_map.metadata.labels[self.constants.label_groupname_key] = user[
104+
# "any_group"
105+
# ]
106+
107+
executors_as_jobs = [
108+
self.from_tes_executor_to_k8s_job(
109+
task_master_config_map.metadata.name,
110+
task["name"],
111+
executor,
112+
idx,
113+
task["resources"],
114+
# user,
115+
)
116+
for idx, executor in enumerate(task["executors"])
117+
]
118+
119+
task_master_input = {
120+
"inputs": task.inputs or [],
121+
"outputs": task.outputs or [],
122+
"volumes": task.volumes or [],
123+
"resources": {"disk_gb": task.resources.disk_gb or 10.0},
124+
}
125+
task_master_input[self.constants.taskmaster_input_exec_key] = executors_as_jobs
126+
127+
task_master_input_as_json = json.dumps(task_master_input)
128+
try:
129+
with BytesIO() as obj:
130+
with gzip.GzipFile(fileobj=obj, mode="wb") as gzip_file:
131+
gzip_file.write(task_master_input_as_json.encode("utf-8"))
132+
task_master_config_map.binary_data = {
133+
f"{self.constants.taskmaster_input}.gz": obj.getvalue()
134+
}
135+
except Exception as e:
136+
logger.info(
137+
f"Compression of task {task_master_config_map.metadata.name} JSON configmap failed",
138+
e,
139+
)
140+
141+
return task_master_config_map
142+
143+
144+
def from_tes_executor_to_k8s_job(
145+
self,
146+
generated_task_id: str,
147+
tes_task_name: str,
148+
executor: TesExecutor,
149+
executor_index: int,
150+
resources: TesResources,
151+
# user: User
152+
) -> V1Job:
153+
# Get new template executor Job object
154+
job = self.executor_template_supplier()
155+
156+
# Set executors name based on taskmaster's job name
157+
Job(job).change_job_name(Task(generated_task_id).get_executor_name(executor_index))
158+
159+
# Put arbitrary labels and annotations
160+
job.metadata.labels = job.metadata.labels or {}
161+
job.metadata.labels["taskId"] = generated_task_id
162+
job.metadata.labels["execNo"] = str(executor_index)
163+
job.metadata.labels["userId"] = user.username
164+
165+
job.metadata.annotations = job.metadata.annotations or {}
166+
job.metadata.annotations["tesTaskName"] = tes_task_name
167+
168+
container = job.spec.template.spec.containers[0]
169+
170+
# Convert potential TRS URI into docker image
171+
container.image = self.trs_client.get_docker_image_for_tool_version_uri(executor.image)
172+
173+
# Map executor's command to job container's command
174+
for command in ExecutorCommandWrapper(executor).get_commands_with_stream_redirects():
175+
container.add_command_item(command)
176+
177+
if executor.env:
178+
container.env = [V1EnvVar(name=key, value=value) for key, value in executor.env.items()]
179+
180+
container.working_dir = executor.workdir
181+
182+
if resources.cpu_cores:
183+
container.resources.requests['cpu'] = QuantityFormatter().parse(str(resources.cpu_cores))
184+
185+
if resources.ram_gb:
186+
container.resources.requests['memory'] = QuantityFormatter().parse(f"{resources.ram_gb:.6f}Gi")
187+
188+
return job

tesk/api/kubernetes/template.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,16 @@
44
import uuid
55

66
from kubernetes.client import (
7+
V1Container,
78
V1EnvVar,
9+
V1JobSpec,
10+
V1ObjectMeta,
11+
V1PodSpec,
12+
V1PodTemplateSpec,
13+
V1ResourceRequirements,
14+
V1SecretVolumeSource,
15+
V1Volume,
16+
V1VolumeMount,
817
)
918
from kubernetes.client.models import V1Job
1019

@@ -19,7 +28,7 @@
1928
class KubernetesTemplateSupplier:
2029
"""Templates for tasmaster's and executor's job object.."""
2130

22-
def __init__(self, namespace=TeskConstants.tesk_namespace):
31+
def __init__(self, namespace=TeskConstants.tesk_namespace, security_context = None):
2332
"""Initialize the converter."""
2433
self.taskmaster_template: V1Job = get_taskmaster_template()
2534
self.taskmaster_env_properties: TaskmasterEnvProperties = (
@@ -28,6 +37,7 @@ def __init__(self, namespace=TeskConstants.tesk_namespace):
2837
self.constants = Constants()
2938
self.k8s_constants = K8sConstants()
3039
self.namespace = namespace
40+
self.security_context= security_context
3141

3242
def get_task_master_name(self) -> str:
3343
"""Generate a unique name for the taskmaster job."""
@@ -89,3 +99,48 @@ def task_master_template(self) -> V1Job:
8999
)
90100

91101
return job
102+
103+
def executor_template(self):
104+
container = V1Container(resources=V1ResourceRequirements())
105+
106+
if self.taskmaster_env_properties.executorSecret is not None:
107+
container.volume_mounts = [
108+
V1VolumeMount(
109+
read_only=True,
110+
name=self.taskmaster_env_properties.executorSecret.name,
111+
mount_path=self.taskmaster_env_properties.executorSecret.mountPath,
112+
)
113+
]
114+
115+
pod_spec = V1PodSpec(
116+
containers=[container],
117+
restart_policy=self.k8s_constants.job_restart_policy,
118+
)
119+
120+
if self.security_context:
121+
pod_spec.security_context = self.security_context
122+
123+
job = V1Job(
124+
api_version=self.k8s_constants.k8s_batch_api_version,
125+
kind=self.k8s_constants.k8s_batch_api_job_type,
126+
metadata=V1ObjectMeta(
127+
labels={
128+
self.constants.label_jobtype_key: self.constants.label_jobtype_value_exec
129+
}
130+
),
131+
spec=V1JobSpec(
132+
template=V1PodTemplateSpec(metadata=V1ObjectMeta(), spec=pod_spec)
133+
),
134+
)
135+
136+
if self.taskmaster_env_properties.executorSecret is not None:
137+
job.spec.template.spec.volumes = [
138+
V1Volume(
139+
name=self.taskmaster_env_properties.executorSecret.name,
140+
secret=V1SecretVolumeSource(
141+
secret_name=self.taskmaster_env_properties.executorSecret.name
142+
),
143+
)
144+
]
145+
146+
return job

tesk/constants.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,27 @@
44

55

66
class TeskConstants:
7-
"""Tesk scoped constants."""
7+
"""Tesk's K8s scoped constants."""
88

99
filer_image_name: str = os.getenv(
10-
"FILER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-filer"
10+
"TESK_API_TASKMASTER_FILER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-filer"
11+
)
12+
filer_image_version: str = os.getenv(
13+
"TESK_API_TASKMASTER_FILER_IMAGE_VERSION", "latest"
1114
)
12-
filer_image_version: str = os.getenv("FILER_IMAGE_VERSION", "latest")
1315
taskmaster_image_name: str = os.getenv(
14-
"TASKMASTER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-taskmaster"
16+
"TESK_API_TASKMASTER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-taskmaster"
17+
)
18+
taskmaster_image_version: str = os.getenv(
19+
"TESK_API_TASKMASTER_IMAGE_VERSION", "latest"
20+
)
21+
tesk_namespace: str = os.getenv("TESK_API_K8S_NAMESPACE", "tesk")
22+
taskmaster_service_account_name: str = os.getenv(
23+
"TESK_API_TASKMASTER_SERVICE_ACCOUNT_NAME", "taskmaster"
24+
)
25+
taskmaster_environement_executor_backoff_limit: str = os.getenv(
26+
"ENVIRONMENT_EXECUTOR_BACKOFF_LIMIT", "2"
27+
)
28+
filer_environment_filer_backoff_limit: str = os.getenv(
29+
"TESK_API_TASKMASTER_ENVIRONMENT_FILER_BACKOFF_LIMIT", "2"
1530
)
16-
taskmaster_image_version: str = os.getenv("TASKMASTER_IMAGE_VERSION", "latest")
17-
tesk_namespace: str = os.getenv("TESK_NAMESPACE", "tesk")

0 commit comments

Comments
 (0)