Skip to content

Commit 6f5a6cb

Browse files
authored
Merge pull request #34 from FrontierDevelopmentLab/feat/append-data
Feat/append data
2 parents d1b576d + 03f83e2 commit 6f5a6cb

5 files changed

Lines changed: 165 additions & 37 deletions

File tree

conf/config.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ output: ./output/${dataset_name}
44
log_path: ${output}/main.log
55
credentials: ${output}/token.json
66
gpd_input: ${output}/aoi.geojson
7-
item_collection: ${output}/item_collection.geojson
7+
item_collection: ???
88
tiles: ${output}/tiles.pkl
9-
extraction_tasks: ${output}/extraction_tasks.pkl
9+
extraction_tasks: ???
10+
11+
overwrite: false
1012

1113
start_date: 2020-01-01
1214
end_date: 2020-02-01

src/satextractor/cli.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
1515
Also see (1) from http://click.pocoo.org/5/setuptools/#setuptools-integration
1616
"""
17+
import base64
18+
import datetime
19+
import hashlib
1720
import os
1821
import pickle
1922

@@ -127,6 +130,7 @@ def preparer(cfg):
127130
cfg.constellations,
128131
f"{cfg.cloud.storage_prefix}/{cfg.cloud.storage_root}/{cfg.dataset_name}",
129132
cfg.tiler.bbox_size,
133+
cfg.overwrite,
130134
)
131135

132136

@@ -136,9 +140,13 @@ def deployer(cfg):
136140
extraction_tasks = pickle.load(open(cfg.extraction_tasks, "rb"))
137141

138142
topic = f"projects/{cfg.cloud.project}/topics/{'-'.join([cfg.cloud.user_id, 'stacextractor'])}"
143+
job_id = (
144+
f"{cfg.cloud.user_id}_{datetime.datetime.now()}_{cfg.extraction_tasks[0:10]}"
145+
)
139146

140147
hydra.utils.call(
141148
cfg.deployer,
149+
job_id,
142150
cfg.credentials,
143151
extraction_tasks,
144152
f"{cfg.cloud.storage_prefix}/{cfg.cloud.storage_root}/{cfg.dataset_name}",
@@ -184,7 +192,23 @@ def main(cfg: DictConfig):
184192
"plugins",
185193
], "valid tasks are [build, stac, tile, schedule, prepare, deploy, plugins]"
186194

187-
logger.info(f"Running tasks {cfg.tasks}")
195+
# prepare a hashed representation of key config values (start time, end time, constellations)
196+
hash_vals = (
197+
"_".join(
198+
[cfg.start_date, cfg.end_date]
199+
+ [constellation for constellation in cfg.constellations],
200+
)
201+
).encode("utf-8")
202+
203+
hash_str_b = hashlib.md5(hash_vals).digest()
204+
hash_str = base64.urlsafe_b64encode(hash_str_b).decode("utf-8").rstrip("=")
205+
206+
cfg.item_collection = os.path.join(cfg.output, f"{hash_str}_item_collection.pkl")
207+
cfg.extraction_tasks = os.path.join(cfg.output, f"{hash_str}_extraction_tasks.pkl")
208+
209+
pickle.dump(cfg, open(os.path.join(cfg.output, f"{hash_str}_cfg.pkl"), "wb"))
210+
211+
logger.info(f"Running tasks {cfg.tasks} for cfg {hash_str}")
188212

189213
if "build" in cfg.tasks:
190214
build(cfg)

src/satextractor/deployer/gcp_deployer.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
11
import concurrent
2-
import hashlib
32
import json
4-
from datetime import datetime
53

64
from google.cloud import pubsub_v1
75
from loguru import logger
86
from satextractor.models.constellation_info import BAND_INFO
97
from tqdm import tqdm
108

119

12-
def deploy_tasks(credentials, extraction_tasks, storage_path, chunk_size, topic):
13-
14-
user_id = topic.split("/")[-1].split("-")[0]
15-
16-
job_id = hashlib.sha224(
17-
(user_id + str(datetime.now())).encode(),
18-
).hexdigest()[:10]
10+
def deploy_tasks(
11+
job_id,
12+
credentials,
13+
extraction_tasks,
14+
storage_path,
15+
chunk_size,
16+
topic,
17+
):
1918

2019
logger.info(f"Deploying {len(extraction_tasks)} tasks with job_id: {job_id}")
2120

src/satextractor/preparer/gcp_preparer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def gcp_prepare_archive(
2323
constellations: List[str],
2424
storage_root: str,
2525
patch_size: int,
26+
overwrite: bool,
2627
chunk_size: int,
2728
n_jobs: int = -1,
2829
verbose: int = 0,
@@ -69,7 +70,7 @@ def gcp_prepare_archive(
6970
):
7071
Parallel(n_jobs=n_jobs, verbose=verbose, prefer="threads")(
7172
[
72-
delayed(zarr.open)(fs.get_mapper(f"{storage_root}/{tile_id}"))
73+
delayed(zarr.open)(fs.get_mapper(f"{storage_root}/{tile_id}"), "a")
7374
for tile_id, _ in items
7475
],
7576
)
@@ -88,6 +89,7 @@ def gcp_prepare_archive(
8889
sensing_times,
8990
constellation,
9091
BAND_INFO[constellation],
92+
overwrite,
9193
),
9294
)
9395

Lines changed: 125 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1+
import datetime
2+
13
import numpy as np
24
import zarr
5+
from zarr.errors import ArrayNotFoundError
6+
from zarr.errors import ContainsArrayError
7+
from zarr.errors import ContainsGroupError
8+
from zarr.errors import PathNotFoundError
39

410

511
def create_zarr_patch_structure(
@@ -11,34 +17,129 @@ def create_zarr_patch_structure(
1117
sensing_times,
1218
constellation,
1319
bands,
20+
overwrite,
1421
):
1522
if not sensing_times.size == 0:
1623
patch_size_pixels = patch_size // min(b["gsd"] for _, b in bands.items())
1724

1825
patch_constellation_path = f"{storage_path}/{tile_id}/{constellation}"
19-
zarr.open(fs_mapper(patch_constellation_path), mode="a")
26+
zarr.open(
27+
fs_mapper(patch_constellation_path),
28+
mode="a",
29+
) # make sure the path exists
2030

2131
patch_path = f"{patch_constellation_path}/data"
22-
zarr.open_array(
23-
fs_mapper(patch_path),
24-
"w",
25-
shape=(
26-
len(sensing_times),
27-
len(bands),
28-
int(patch_size_pixels),
29-
int(patch_size_pixels),
30-
),
31-
chunks=(1, 1, int(chunk_size), int(chunk_size)),
32-
dtype=np.uint16,
33-
)
34-
35-
# Create timestamps array
36-
timestamps_path = f"{patch_constellation_path}/timestamps"
37-
z_dates = zarr.open_array(
38-
fs_mapper(f"{timestamps_path}"),
39-
mode="w",
40-
shape=(len(sensing_times)),
41-
chunks=(len(sensing_times)),
42-
dtype="<U27",
43-
)
44-
z_dates[:] = sensing_times
32+
33+
if overwrite:
34+
zarr.open_array(
35+
fs_mapper(patch_path),
36+
"w",
37+
shape=(
38+
len(sensing_times),
39+
len(bands),
40+
int(patch_size_pixels),
41+
int(patch_size_pixels),
42+
),
43+
chunks=(1, 1, int(chunk_size), int(chunk_size)),
44+
dtype=np.uint16,
45+
)
46+
47+
# Create timestamps array
48+
timestamps_path = f"{patch_constellation_path}/timestamps"
49+
z_dates = zarr.open_array(
50+
fs_mapper(f"{timestamps_path}"),
51+
mode="w",
52+
shape=(len(sensing_times)),
53+
chunks=(len(sensing_times)),
54+
dtype="<U27",
55+
)
56+
z_dates[:] = sensing_times
57+
58+
else:
59+
60+
# read current timestamps
61+
timestamps_path = f"{patch_constellation_path}/timestamps"
62+
63+
try:
64+
existing_timestamps = zarr.open_array(fs_mapper(timestamps_path), "r")[
65+
:
66+
]
67+
existing_timestamps = np.array(
68+
[
69+
np.datetime64(datetime.datetime.fromisoformat(el))
70+
for el in existing_timestamps
71+
],
72+
)
73+
74+
new_timesteps = np.array(sensing_times)[
75+
~np.isin(sensing_times, existing_timestamps)
76+
]
77+
78+
if new_timesteps.shape[0] > 0:
79+
assert max(existing_timestamps) <= min(
80+
new_timesteps,
81+
), "Sat-Extractor can only append more recent data or overwrite existing data. "
82+
83+
# get union of sensing times
84+
timestamps_union = np.union1d(existing_timestamps, sensing_times)
85+
86+
except (PathNotFoundError, ContainsGroupError, ArrayNotFoundError):
87+
timestamps_union = sensing_times
88+
except Exception as e:
89+
raise e
90+
91+
# write sensing times fresh
92+
z_dates = zarr.open_array(
93+
fs_mapper(f"{timestamps_path}"),
94+
mode="w",
95+
shape=(len(timestamps_union)),
96+
chunks=(len(timestamps_union)),
97+
dtype="<U27",
98+
)
99+
z_dates[:] = timestamps_union
100+
101+
# resize any existing array based thereon
102+
103+
# data
104+
try:
105+
# if it doesn't exist, create it.
106+
z_data = zarr.open_array(
107+
fs_mapper(patch_path),
108+
"w-",
109+
shape=(
110+
len(timestamps_union),
111+
len(bands),
112+
int(patch_size_pixels),
113+
int(patch_size_pixels),
114+
),
115+
chunks=(1, 1, int(chunk_size), int(chunk_size)),
116+
dtype=np.uint16,
117+
)
118+
except ContainsArrayError:
119+
z_data = zarr.open_array(fs_mapper(patch_path), "r+")
120+
121+
data_shape = z_data.shape
122+
z_data.resize(len(timestamps_union), *data_shape[1:])
123+
124+
except Exception as e:
125+
raise e
126+
127+
# masks
128+
mask_root_path = f"{patch_constellation_path}/mask"
129+
z_mask_dir = zarr.open_group(
130+
fs_mapper(mask_root_path),
131+
"a",
132+
)
133+
134+
for mask_key in z_mask_dir.keys():
135+
136+
mask_path = f"{mask_root_path}/{mask_key}"
137+
138+
z_mask = zarr.open_array(
139+
fs_mapper(mask_path),
140+
"r+",
141+
)
142+
143+
mask_shape = z_mask.shape
144+
145+
z_mask.resize(len(timestamps_union), *mask_shape[1:])

0 commit comments

Comments
 (0)