Skip to content

Commit 77042d8

Browse files
committed
added consolidated metadata support
1 parent 55c8598 commit 77042d8

7 files changed

Lines changed: 250 additions & 31 deletions

File tree

hsds/async_lib.py

Lines changed: 180 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717
from aiohttp.web_exceptions import HTTPForbidden
1818
from h5json.hdf5dtype import getItemSize
1919
from h5json.hdf5dtype import createDataType
20-
from h5json.array_util import getNumElements, bytesToArray
20+
from h5json.array_util import getNumElements, bytesToArray, bytesArrayToList
2121
from h5json.objid import isValidUuid, isSchema2Id, getS3Key, isS3ObjKey
2222
from h5json.objid import getObjId, isValidChunkId, getCollectionForId
2323
from h5json.filters import getFilters
24-
from h5json.shape_util import getShapeDims
24+
from h5json.shape_util import getShapeDims, getDataSize
2525
from h5json.dset_util import getDatasetLayoutClass, getDatasetLayout, getChunkDims
2626
from h5json.time_util import getNow
2727

28-
from .util.chunkUtil import getDatasetId, getNumChunks, ChunkIterator
28+
from .util.chunkUtil import getDatasetId, getNumChunks, ChunkIterator, getChunkIndex, getChunkIds
2929
from .util.dsetUtil import getHyperslabSelection
3030
from .util.storUtil import getStorKeys, putStorJSONObj, getStorJSONObj
3131
from .util.storUtil import deleteStorObj, getStorBytes, isStorObj
@@ -77,6 +77,7 @@ async def updateDatasetInfo(app, dset_id, dataset_info, bucket=None):
7777
msg += f"{dset_id}"
7878
log.warn(msg)
7979
return
80+
8081
type_json = dset_json["type"]
8182
item_size = getItemSize(type_json)
8283
if not getDatasetLayout(dset_json):
@@ -112,7 +113,7 @@ async def updateDatasetInfo(app, dset_id, dataset_info, bucket=None):
112113
if layout_class == "H5D_CONTIGUOUS_REF":
113114
# In H5D_CONTIGUOUS_REF a non-compressed part of the HDF5 is divided
114115
# into equal size chunks, so we can just compute link bytes and num
115-
# chunks based on the size of the coniguous dataset
116+
# chunks based on the size of the contiguous dataset
116117
layout_dims = getChunkDims(dset_json)
117118
num_chunks = getNumChunks(selection, layout_dims)
118119
chunk_size = item_size
@@ -268,20 +269,26 @@ def scanRootCallback(app, s3keys):
268269
results = app["scanRoot_results"]
269270
scanRoot_keyset = app["scanRoot_keyset"]
270271
checksums = results["checksums"]
272+
271273
for s3key in s3keys.keys():
272274

273275
if not isS3ObjKey(s3key):
274-
log.info(f"not s3obj key, ignoring: {s3key}")
276+
log.info(f"scanRoot -not s3obj key, ignoring: {s3key}")
275277
continue
276278
if s3key in scanRoot_keyset:
277-
log.warn(f"scanRoot - dejavu for key: {s3key}")
279+
log.warn(f"scanRoot -scanRoot - dejavu for key: {s3key}")
278280
continue
279281
scanRoot_keyset.add(s3key)
280-
msg = f"scanRoot adding key: {s3key} to keyset, "
282+
msg = f"scanRoot - adding key: {s3key} to keyset, "
281283
msg += f"{len(scanRoot_keyset)} keys"
282284
log.debug(msg)
283285

284286
objid = getObjId(s3key)
287+
288+
if objid in app["deleted_ids"]:
289+
log.debug(f"scanRoot - skipping deleted id: {objid}")
290+
continue
291+
285292
etag = None
286293
obj_size = None
287294
lastModified = None
@@ -306,8 +313,15 @@ def scanRootCallback(app, s3keys):
306313
is_chunk = True
307314
results["num_chunks"] += 1
308315
results["allocated_bytes"] += obj_size
316+
chunk_index = getChunkIndex(objid)
317+
if max(chunk_index) == 0:
318+
# save the first chunk if present
319+
# this will be used to save dataset values to
320+
# the the obj_ids set for small datasets
321+
results["obj_ids"].add(objid)
309322
else:
310323
results["metadata_bytes"] += obj_size
324+
results["obj_ids"].add(objid)
311325

312326
if is_chunk or getCollectionForId(objid) == "datasets":
313327
if is_chunk:
@@ -345,6 +359,144 @@ def scanRootCallback(app, s3keys):
345359
log.error(msg)
346360

347361

362+
async def _getDatsetValueJson(app, dset_id, dset_json, obj_ids, size_limit=None, bucket=None):
363+
""" If the dataset size is less than size_limit, and the chunk_ids for the dataset are
364+
available, return a JSON representation of the dataset values. Othewise, return None """
365+
366+
dims = getShapeDims(dset_json)
367+
if dims is None:
368+
return None # null dataspace
369+
if "type" not in dset_json:
370+
msg = f"_getDatsetValueJson - expected to find type in dataset_json for {dset_id}"
371+
log.warn(msg)
372+
return None
373+
type_json = dset_json["type"]
374+
item_size = getItemSize(type_json)
375+
if item_size == "H5T_VARIABLE":
376+
item_size = 1024 # make a guess for variable length types
377+
dataset_size = getDataSize(dims, item_size)
378+
if dataset_size > size_limit:
379+
log.debug(f"_getDatasetValueJson - dataset size {dataset_size} exceeds limit {size_limit}")
380+
return None
381+
382+
chunk_dims = getChunkDims(dset_json)
383+
if not chunk_dims:
384+
log.warning(f"_getDatasetValueJson - no layout found for dataset: {dset_id}")
385+
return None
386+
if chunk_dims != dims:
387+
msg = f"_getDatasetValueJson - dataset layout {chunk_dims} does not match dims {dims} "
388+
msg += f"for dataset: {dset_id}, ignoring"
389+
log.warning(msg)
390+
return None
391+
select_all = getHyperslabSelection(dims) # select entire datashape
392+
chunk_ids = getChunkIds(dset_id, select_all, dims)
393+
if len(chunk_ids) == 0:
394+
log.debug(f"_getDatasetValueJson - no chunk ids found for dataset: {dset_id}")
395+
return None
396+
if len(chunk_ids) > 1:
397+
log.debug(f"_getDatasetValueJson - more than one chunk id found for dataset: {dset_id}")
398+
return None
399+
chunk_id = chunk_ids[0]
400+
if chunk_id not in obj_ids:
401+
log.debug(f"_getDatasetValueJson - chunk id {chunk_id} not in scanned obj_ids")
402+
return None
403+
log.debug(f"using chunk: {chunk_id} to get dataset value for {dset_id}")
404+
405+
# fetch the chunk - using getStoreBytes since this will not be used with
406+
# chunk cache or chunk crawlers
407+
# TBD: need parameters for s3path, s3offset, s3size for ref layouts
408+
# regular store read
409+
410+
filters = getFilters(dset_json)
411+
dt = createDataType(type_json)
412+
filter_ops = getFilterOps(app, dset_id, filters, dtype=dt, chunk_shape=chunk_dims)
413+
414+
kwargs = {
415+
"filter_ops": filter_ops,
416+
"offset": None,
417+
"length": None,
418+
"bucket": bucket
419+
}
420+
s3key = getS3Key(chunk_id)
421+
422+
try:
423+
chunk_bytes = await getStorBytes(app, s3key, **kwargs)
424+
except HTTPNotFound:
425+
log.warning(f"_getDatasetValueJson - HTTPNotFound for chunk {chunk_id} bucket:{bucket}")
426+
return None
427+
except HTTPForbidden:
428+
log.warning(f"_getDatasetValueJson - HTTPForbidden for chunk {chunk_id} bucket:{bucket}")
429+
return None
430+
except HTTPInternalServerError:
431+
msg = "_getDatasetValueJson - "
432+
msg += f"HTTPInternalServerError for chunk {chunk_id} bucket:{bucket}"
433+
log.warning(msg)
434+
return None
435+
436+
if chunk_bytes is None:
437+
msg = f"_getDatasetValueJson -read {chunk_id} bucket: {bucket} returned None"
438+
log.warning(msg)
439+
return None
440+
441+
arr = bytesToArray(chunk_bytes, dt, chunk_dims)
442+
443+
json_value = bytesArrayToList(arr)
444+
log.debug(f"_getDatsetValueJson - returning {json_value}")
445+
446+
return json_value
447+
448+
449+
async def getConsolidatedMetaData(app, obj_ids, bucket=None):
450+
# create a consolidated metadata summary for all objects in the domain
451+
# return a dict of obj_ids to their metadata summaries
452+
log.info("getConsolidatedMetaData - creating consolidated metadata summary")
453+
consolidated_metadata = {}
454+
for obj_id in obj_ids:
455+
if isValidChunkId(obj_id):
456+
# skip chunks - we may use the chunk later when processing it's dataset object
457+
continue
458+
s3_key = getS3Key(obj_id)
459+
try:
460+
obj_json = await getStorJSONObj(app, s3_key, bucket=bucket)
461+
except HTTPNotFound:
462+
log.warn(f"HTTPNotFound for {s3_key} bucket:{bucket}")
463+
continue
464+
except HTTPForbidden:
465+
log.warn(f"HTTPForbidden error for {s3_key} bucket:{bucket}")
466+
continue
467+
except HTTPInternalServerError:
468+
msg = f"HTTPInternalServerError error for {s3_key} bucket:{bucket}"
469+
log.warn(msg)
470+
continue
471+
log.debug(f"getConsolidatedMetaData - got json for obj_id: {obj_id}: {obj_json}")
472+
# extract relevant metadata
473+
metadata_summary = {}
474+
if "type" in obj_json:
475+
metadata_summary["type"] = obj_json["type"]
476+
if "shape" in obj_json:
477+
metadata_summary["shape"] = obj_json["shape"]
478+
if "attributes" in obj_json:
479+
metadata_summary["attributes"] = obj_json["attributes"]
480+
if "links" in obj_json:
481+
metadata_summary["links"] = obj_json["links"]
482+
if "creationProperties" in obj_json:
483+
metadata_summary["creationProperties"] = obj_json["creationProperties"]
484+
if getCollectionForId(obj_id) == "datasets":
485+
log.debug("getConsolidatedMetaData - got dataset")
486+
size_limit = 4096 # TBD - make this a config
487+
kwargs = {"size_limit": size_limit, "bucket": bucket}
488+
json_value = await _getDatsetValueJson(app, obj_id, obj_json, obj_ids, **kwargs)
489+
if json_value is not None:
490+
log.debug(f"adding dataset value to metadata summary for dataset: {obj_id}")
491+
metadata_summary["value"] = json_value
492+
else:
493+
log.debug("getConsolidatedMetaData - not a dataset")
494+
495+
consolidated_metadata[obj_id] = metadata_summary
496+
log.info("getConsolidatedMetaData - done creating consolidated metadata summary")
497+
return consolidated_metadata
498+
499+
348500
async def scanRoot(app, rootid, update=False, bucket=None):
349501

350502
# iterate through all s3 keys under the given root.
@@ -386,7 +538,8 @@ async def scanRoot(app, rootid, update=False, bucket=None):
386538
results["num_linked_chunks"] = 0
387539
results["linked_bytes"] = 0
388540
results["logical_bytes"] = 0
389-
results["checksums"] = {} # map of objid to checksums
541+
results["obj_ids"] = set() # map of object ids scanned (and first chunk id for datasets)
542+
results["checksums"] = {} # map of objid to checksums
390543
results["bucket"] = bucket
391544
results["scan_start"] = getNow(app=app)
392545

@@ -405,6 +558,9 @@ async def scanRoot(app, rootid, update=False, bucket=None):
405558
num_objects += len(results["datasets"])
406559
num_objects += results["num_chunks"]
407560
log.info(f"scanRoot - got {num_objects} keys for rootid: {rootid}")
561+
obj_ids = results["obj_ids"]
562+
log.info(f"scanRoot - got {len(obj_ids)} unique object ids")
563+
log.debug(f"scanRoot - obj_ids: {obj_ids}")
408564

409565
dataset_results = results["datasets"]
410566
for dsetid in dataset_results:
@@ -445,13 +601,29 @@ async def scanRoot(app, rootid, update=False, bucket=None):
445601

446602
results["scan_complete"] = getNow(app=app)
447603

604+
# extract the obj_ids set, that won't go into .info.json
605+
obj_ids = results["obj_ids"]
606+
del results["obj_ids"]
607+
log.debug(f"obj_ids set: {obj_ids}")
608+
448609
if update:
449610
# write .info object back to S3
450611
info_key = root_prefix + ".info.json"
451612
msg = f"scanRoot - updating info key: {info_key} with results: "
452613
msg += f"{results}"
453614
log.info(msg)
454615
await putStorJSONObj(app, info_key, results, bucket=bucket)
616+
617+
# create a json summary of objects in ths domain
618+
log.debug(f"Creating consolidated metadata summary for root {rootid}")
619+
summary_key = root_prefix + ".summary.json"
620+
summary_data = await getConsolidatedMetaData(app, obj_ids, bucket=bucket)
621+
if summary_data:
622+
log.info(f"Got consolidated metadata summary for root {rootid}")
623+
log.debug(f"Summary data: {summary_data}")
624+
await putStorJSONObj(app, summary_key, summary_data, bucket=bucket)
625+
else:
626+
log.info(f"No consolidated metadata summary for root {rootid}")
455627
return results
456628

457629

hsds/datanode_lib.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1094,7 +1094,7 @@ async def get_chunk(
10941094
log.debug(msg)
10951095
else:
10961096
s3key = getS3Key(chunk_id)
1097-
log.debug(f"getChunk chunkid: {chunk_id} bucket: {bucket}")
1097+
log.debug(f"getChunk chunkid: {chunk_id} bucket: {bucket} using key: {s3key}")
10981098
if chunk_id in chunk_cache:
10991099
log.debug(f"getChunk chunkid: {chunk_id} found in cache")
11001100
chunk_arr = chunk_cache[chunk_id]

hsds/domain_sn.py

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,11 @@ async def GET_Domain(request):
462462
if "verbose" in params and params["verbose"]:
463463
verbose = True
464464

465+
getobjs = False
466+
# include domain objects if requested
467+
if params.get("getobjs"):
468+
getobjs = True
469+
465470
if not domain:
466471
log.info("no domain passed in, returning all top-level domains")
467472
# no domain passed in, return top-level domains for this request
@@ -543,22 +548,9 @@ async def GET_Domain(request):
543548
return resp
544549

545550
# return just the keys as per the REST API
546-
kwargs = {"verbose": verbose, "bucket": bucket}
551+
kwargs = {"verbose": verbose, "getobjs": getobjs, "bucket": bucket}
547552
rsp_json = await getDomainResponse(app, domain_json, **kwargs)
548553

549-
# include domain objects if requested
550-
if params.get("getobjs") and "root" in domain_json:
551-
552-
log.debug("getting all domain objects")
553-
root_id = domain_json["root"]
554-
kwargs = {"include_attrs": include_attrs, "bucket": bucket}
555-
domain_objs = await getDomainObjects(app, root_id, **kwargs)
556-
if domain_objs:
557-
rsp_json["domain_objs"] = domain_objs
558-
559-
# include domain class if present
560-
# if "class" in domain_json:
561-
# rsp_json["class"] = domain_json["class"]
562554

563555
# include dn_ids if requested
564556
if "getdnids" in params and params["getdnids"]:

hsds/servicenode_lib.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ async def getDomainJson(app, domain, reload=False):
116116
return domain_json
117117

118118

119-
async def getDomainResponse(app, domain_json, bucket=None, verbose=False):
119+
async def getDomainResponse(app, domain_json, bucket=None, verbose=False, getobjs=False):
120120
""" construct JSON response for domain request """
121121
rsp_json = {}
122122
if "root" in domain_json:
@@ -189,6 +189,13 @@ async def getDomainResponse(app, domain_json, bucket=None, verbose=False):
189189
rsp_json["num_linked_chunks"] = num_linked_chunks
190190
rsp_json["md5_sum"] = md5_sum
191191

192+
if getobjs and "root" in domain_json:
193+
root_id = domain_json["root"]
194+
domain_objs = await getDomainObjs(app, root_id, bucket=bucket)
195+
if domain_objs:
196+
log.debug(f"returning {len(domain_objs)} for root_id: {root_id}")
197+
rsp_json["domain_objs"] = domain_objs
198+
192199
# pass back config parameters the client may care about
193200

194201
rsp_json["limits"] = getLimits()
@@ -849,8 +856,32 @@ async def getRootInfo(app, root_id, bucket=None):
849856
return info_json
850857

851858

859+
async def getDomainObjs(app, root_id, bucket=None):
860+
""" Return domain objects if available for this root id """
861+
log.debug(f"getDomainObjs {root_id}")
862+
863+
s3_key = getS3Key(root_id)
864+
865+
parts = s3_key.split("/")
866+
# dset_key is in the format db/<root>/d/<dset>/.dataset.json
867+
# get the key for the root info object as: db/<root>/.summary.json
868+
if len(parts) != 3:
869+
log.error(f"Unexpected s3key format: {s3_key}")
870+
return None
871+
872+
summary_key = f"db/{parts[1]}/.summary.json"
873+
874+
try:
875+
summary_json = await getStorJSONObj(app, summary_key, bucket=bucket)
876+
except HTTPNotFound:
877+
log.warn(f".summary.json not found for key: {summary_key}")
878+
return None
879+
880+
return summary_json
881+
882+
852883
async def doFlush(app, root_id, bucket=None):
853-
"""return wnen all DN nodes have wrote any pending changes to S3"""
884+
"""return wnen all DN nodes have wrote any pending changes to S3 """
854885
log.info(f"doFlush {root_id}")
855886
params = {"flush": 1}
856887
if bucket:

hsds/util/storUtil.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ async def getStorBytes(app,
493493
chunk_bytes = []
494494

495495
for chunk_location in chunk_locations:
496-
log.debug(f"getStoreBytes - processing chunk_location: {chunk_location}")
496+
log.debug(f"getStorBytes - processing chunk_location: {chunk_location}")
497497
n = chunk_location.offset - offset
498498
if n < 0:
499499
log.warn(f"getStorBytes - unexpected offset for chunk_location: {chunk_location}")

0 commit comments

Comments
 (0)