Skip to content

Map over product files from a previous step #3020

@jhkennedy

Description

@jhkennedy

Right now, HyP3 can fan out steps by mapping over input parameters (See LAVAs job spec).

For PISM, it would be ideal to map over files in the job-id prefix for the content bucket. For example, to run the ensemble simulations we're doing:

import random
from copy import deepcopy
from pathlib import Path

import geopandas as gpd
import hyp3_sdk as sdk
import s3fs


PISM_CLOUD_BUCKET = 'hyp3-pism-cloud-test-contentbucket-zs9dctrqrlvx'

STAGE_TEMPLATE =     {
    # "name": "RGI2000-v7.0-C-01-09429_era5_agu_1year",
    "job_type": "PISM_TERRA_PREP_ENSEMBLE",
    "job_parameters": {
        # "rgi_id": "RGI2000-v7.0-C-01-09429",
        "rgi_gpkg": "s3://pism-cloud-data/terra/rgi.gpkg",
        "pism_config": "s3://pism-cloud-data/terra/era5_ec2.toml",
        "run_template": "s3://pism-cloud-data/terra/ec2.j2",
        "uq_config": "s3://pism-cloud-data/terra/era5_agu.toml",
        # "ntasks": 32,
    }
}

EXECUTE_TEMPLATE = {
    # "name": "RGI2000-v7.0-C-01-09429_era5_agu_1year",
    "job_type": "PISM_TERRA_EXECUTE",
    "job_parameters": {
        # "ensemble_job_id": "042ffcdc-2134-4b18-b1af-b22fdf7cbb52",
        # "run_script": "RGI2000-v7.0-C-01-09429/run_scripts/submit_g400m_RGI2000-v7.0-C-01-09429_id_0_1978-01-01_1979-01-01.sh",
    }
}


def get_run_scripts(job: sdk.Job) ->  list[str]:
    fs = s3fs.S3FileSystem(anon=True)
    files = fs.ls(f'{PISM_CLOUD_BUCKET}/{job.job_id}/{job.job_parameters["rgi_id"]}/run_scripts')
    return [str(Path(file).relative_to(f'{PISM_CLOUD_BUCKET}/{job.job_id}/')) for file in files]


# rgi = gpd.read_file("pism-terra/data/rgi/rgi.gpkg").sort_values("area_km2", ascending=False)
# rgi_ids = rgi[(rgi["o1region"] == "01") & (rgi["area_km2"] >= 500)].rgi_id.to_list()
rgi_info = [
    ('RGI2000-v7.0-C-01-09429', 32),
    ('RGI2000-v7.0-C-01-11818', 32),
    ('RGI2000-v7.0-C-01-08012', 32),
    ('RGI2000-v7.0-C-01-14612', 32),
    ('RGI2000-v7.0-C-01-04374', 32),
    ('RGI2000-v7.0-C-01-12784', 32),
    ('RGI2000-v7.0-C-01-03383', 24),
    ('RGI2000-v7.0-C-01-08153', 28),
    ('RGI2000-v7.0-C-01-03718', 32),
    ('RGI2000-v7.0-C-01-01407', 28),
    ('RGI2000-v7.0-C-01-03102', 32),
    ('RGI2000-v7.0-C-01-06260', 20),
    ('RGI2000-v7.0-C-01-16106', 20),
    ('RGI2000-v7.0-C-01-04024', 32),
    ('RGI2000-v7.0-C-01-14978', 12),
    ('RGI2000-v7.0-C-01-08332', 24),
    ('RGI2000-v7.0-C-01-16008', 16),
    ('RGI2000-v7.0-C-01-14907', 20),
]


prepared_jobs = []
for rgi_id, ntasks in rgi_info:
    job_dict = deepcopy(STAGE_TEMPLATE)
    job_dict['name'] = f'{rgi_id}_{Path(job_dict["job_parameters"]["pism_config"]).stem}'
    job_dict['job_parameters']['rgi_id'] = rgi_id
    job_dict['job_parameters']['ntasks'] = ntasks
    prepared_jobs.append(job_dict)



hyp3 = sdk.HyP3('https://pism-cloud-test.asf.alaska.edu')

jobs = hyp3.submit_prepared_jobs(prepared_jobs)
jobs = hyp3.watch(jobs)


prepared_jobs = []
for job in jobs:
    run_scripts = get_run_scripts(job)
    for script in run_scripts:
        job_dict = deepcopy(EXECUTE_TEMPLATE)
        job_dict['name'] = job.name
        job_dict['job_parameters']['ensemble_job_id'] = job.job_id
        job_dict['job_parameters']['run_script'] = script
        prepared_jobs.append(job_dict)

random.shuffle(prepared_jobs)

jobs += hyp3.submit_prepared_jobs(prepared_jobs)
jobs = hyp3.watch(jobs)

job_names = {job.name for job in jobs}
print(job_names)

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions