3737from collections .abc import Iterable , Sequence
3838from typing import Any
3939
40+ import astropy .units as apu
4041import numpy as np
4142import pandas as pd
43+
4244from lsst .daf .butler import Butler , DatasetRef , DatasetType
4345from lsst .daf .butler .utils import globToRegex
4446from lsst .pex .config import Field , ListField
6062# written in task metadata were platform-dependent. Once we no longer care
6163# about older runs, this import and the code that uses it can be removed.
6264from lsst .utils .usage import _RUSAGE_MEMORY_MULTIPLIER
65+ from lsst .verify import Measurement
66+
67+ from ..interfaces import MetricMeasurementBundle
68+ from ..interfaces ._task import _timestampValidator
6369
6470_LOG = logging .getLogger (__name__ )
6571
6672
73+ def _resource_table_to_bundle (
74+ table : pd .DataFrame , dataset_identifier : str , reference_package : str , timestamp_version : str
75+ ) -> MetricMeasurementBundle :
76+ """Convert a resource usage table into a `MetricMeasurementBundle`
77+
78+ See `lsst.analysis.tools.interfaces.AnalysisPipelineTask` for more
79+ information on each of the following options.
80+
81+ Parameters
82+ ----------
83+ table : `DataFrame`
84+ Resource Usge in the the form of a DataFrame.
85+ dataset_identifier : `str`
86+ The name of the data processing to associate with this metric bundle.
87+ reference_package : `str`
88+ The reference package to use if the timestamp version is set to a
89+ package version.
90+ timestamp_version : `str`
91+ The type of timestamp to associate with the bundle.
92+ """
93+ bundle = MetricMeasurementBundle (
94+ dataset_identifier = dataset_identifier ,
95+ reference_package = reference_package ,
96+ timestamp_version = timestamp_version ,
97+ )
98+ # determine all the columns in the table these will become measurements.
99+ column_keys = set (table .keys ())
100+ # discard the task, as this will be like the AnalysisTools in the bundle.
101+ column_keys .remove ("task" )
102+ # Measurements need units, use this to map the column to unit type.
103+ unit_mapping = (
104+ ("quanta" , apu .Unit ("count" )),
105+ ("_hrs" , apu .Unit ("hour" )),
106+ ("_GB" , apu .Unit ("Gbyte" )),
107+ ("_s" , apu .Unit ("s" )),
108+ )
109+ # for each row, grab the task name, and create a list of measurements.
110+ for _ , row in table .iterrows ():
111+ task_name = f"{ row ['task' ]} _memrun"
112+ task_data = []
113+ for key in column_keys :
114+ unit = None
115+ for stub , value in unit_mapping :
116+ if stub in key :
117+ unit = value
118+ if unit is None :
119+ raise ValueError (f"Could not determine units for task { row ['task' ]} " )
120+ task_data .append (Measurement (key , row [key ] * unit ))
121+ bundle [task_name ] = task_data
122+ return bundle
123+
124+
67125class ConsolidateResourceUsageConnections (PipelineTaskConnections , dimensions = ()):
68126 """Connection definitions for `ConsolidateResourceUsageTask`."""
69127
@@ -74,6 +132,16 @@ class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=()
74132 doc = "Consolidated table of resource usage statistics. One row per task label" ,
75133 )
76134
135+ output_metrics = cT .Output (
136+ name = "ResourceUsageSummary_metrics" ,
137+ storageClass = "MetricMeasurementBundle" ,
138+ dimensions = (),
139+ doc = (
140+ "MetricMeasurementBundle with the same information as the ResourceUsageSummary in the form "
141+ "required for Sasquatch dispatch"
142+ ),
143+ )
144+
77145 def __init__ (self , * , config ):
78146 super ().__init__ (config = config )
79147 for name in self .config .input_names :
@@ -88,6 +156,8 @@ def __init__(self, *, config):
88156 ),
89157 )
90158 self .inputs .add (name )
159+ if not self .config .do_make_metrics :
160+ self .outputs .remove ("output_metrics" )
91161
92162
93163class ConsolidateResourceUsageConfig (
@@ -99,6 +169,25 @@ class ConsolidateResourceUsageConfig(
99169 doc = "Input resource usage dataset type names" ,
100170 default = [],
101171 )
172+ do_make_metrics = Field [bool ](doc = "Make metric bundle in addition to DataFrame" , default = False )
173+ dataset_identifier = Field [str ](doc = "An identifier to be associated with output Metrics" , optional = True )
174+ reference_package = Field [str ](
175+ doc = "A package whos version, at the time of metric upload to a "
176+ "time series database, will be converted to a timestamp of when "
177+ "that version was produced" ,
178+ default = "lsst_distrib" ,
179+ optional = True ,
180+ )
181+ timestamp_version = Field [str ](
182+ doc = "Which time stamp should be used as the reference timestamp for a "
183+ "metric in a time series database, valid values are; "
184+ "reference_package_timestamp, run_timestamp, current_timestamp, "
185+ "dataset_timestamp and explicit_timestamp:datetime where datetime is "
186+ "given in the form %Y%m%dT%H%M%S%z" ,
187+ default = "run_timestamp" ,
188+ check = _timestampValidator ,
189+ optional = True ,
190+ )
102191
103192
104193class ConsolidateResourceUsageTask (PipelineTask ):
@@ -113,6 +202,7 @@ class ConsolidateResourceUsageTask(PipelineTask):
113202 """
114203
115204 ConfigClass = ConsolidateResourceUsageConfig
205+ config : ConsolidateResourceUsageConfig
116206 _DefaultName = "consolidateResourceUsage"
117207
118208 def run (self , ** kwargs : Any ) -> Struct :
@@ -166,8 +256,18 @@ def run(self, **kwargs: Any) -> Struct:
166256 .sort_values ("task" ),
167257 memrun ,
168258 )
259+ results = Struct (output_table = memrun )
260+
261+ if self .config .do_make_metrics :
262+ bundle = _resource_table_to_bundle (
263+ memrun ,
264+ self .config .dataset_identifier ,
265+ self .config .reference_package ,
266+ self .config .timestamp_version ,
267+ )
268+ results .output_metrics = bundle
169269
170- return Struct ( output_table = memrun )
270+ return results
171271
172272
173273class GatherResourceUsageConnections (
@@ -547,6 +647,18 @@ class ResourceUsageQuantumGraphBuilder(QuantumGraphBuilder):
547647 Whether *execution* of this quantum graph will permit clobbering. If
548648 `False` (default), existing outputs in ``output_run`` are an error
549649 unless ``skip_existing_in`` will cause those quanta to be skipped.
650+ make_metric : `bool`, optional
651+ Produce a metric measurement bundle when processing the output
652+ table.
653+ timestamp_version : `str`, optional
654+ The type of timestamp used when creating a `MetricMeasurementBundle`,
655+ see there for more details.
656+ dataset_identifier: `str`, optional
657+ A processing identifer that is associated with the processing of this
658+ data, for instance "RC2_subset" for the nightly reprocessings.
659+ reference_package : `str`, optional
660+ The dataset used as an identifier when timestamp_version is set to
661+ reference_package.
550662
551663 Notes
552664 -----
@@ -567,6 +679,10 @@ def __init__(
567679 output_run : str | None = None ,
568680 skip_existing_in : Sequence [str ] = (),
569681 clobber : bool = False ,
682+ make_metric : bool = False ,
683+ timestamp_version : str | None = None ,
684+ dataset_identifier : str | None = None ,
685+ reference_package : str | None = None ,
570686 ):
571687 # Start by querying for metadata datasets, since we'll need to know
572688 # which dataset types exist in the input collections in order to
@@ -580,6 +696,11 @@ def __init__(
580696 pipeline_graph = PipelineGraph ()
581697 metadata_refs : dict [str , set [DatasetRef ]] = {}
582698 consolidate_config = ConsolidateResourceUsageConfig ()
699+ if make_metric :
700+ consolidate_config .do_make_metrics = True
701+ consolidate_config .dataset_identifier = dataset_identifier
702+ consolidate_config .timestamp_version = timestamp_version
703+ consolidate_config .reference_package = reference_package
583704 for results in butler .registry .queryDatasets (
584705 input_dataset_types ,
585706 where = where ,
@@ -753,6 +874,37 @@ def make_argument_parser(cls) -> argparse.ArgumentParser:
753874 default = None ,
754875 metavar = "RUN" ,
755876 )
877+ parser .add_argument (
878+ "--make-metric" ,
879+ type = bool ,
880+ help = (
881+ "Turn the output resource usage table into a metric measurement bundle format compatible "
882+ "with Sasquatch."
883+ ),
884+ default = True ,
885+ metavar = "DO_MAKE_METRIC" ,
886+ )
887+ parser .add_argument (
888+ "--dataset-identifier" ,
889+ type = str ,
890+ help = "Set the dataset these results are associated with." ,
891+ default = None ,
892+ metavar = "DATASET_IDENTIFIER" ,
893+ )
894+ parser .add_argument (
895+ "--reference-package" ,
896+ type = str ,
897+ help = "Reference package to use when selecting reference timestamp" ,
898+ default = "lsst_distrib" ,
899+ metavar = "REFERENCE_PACKAGE" ,
900+ )
901+ parser .add_argument (
902+ "--timestamp-version" ,
903+ type = str ,
904+ help = "Set the dataset these results are associated with." ,
905+ default = "run_timestamp" ,
906+ metavar = "TIMESTAMP_VERSION" ,
907+ )
756908 return parser
757909
758910 @classmethod
@@ -770,13 +922,26 @@ def main(cls) -> None:
770922 raise ValueError ("At least one of --output or --output-run options is required." )
771923 args .output_run = "{}/{}" .format (args .output , Instrument .makeCollectionTimestamp ())
772924
925+ extra_args = {}
926+ if args .make_metric :
927+ if args .dataset_identifier is None or args .timestamp_version is None :
928+ raise ValueError (
929+ "If metrics are going to be created, --dataset-identifier and --timestamp-version "
930+ "must be specified."
931+ )
932+ extra_args ["make_metric" ] = True
933+ extra_args ["timestamp_version" ] = args .timestamp_version
934+ extra_args ["dataset_identifier" ] = args .dataset_identifier
935+ extra_args ["reference_package" ] = args .reference_package
936+
773937 butler = Butler (args .repo , collections = args .collections )
774938 builder = cls (
775939 butler ,
776940 dataset_type_names = args .dataset_types ,
777941 where = args .where ,
778942 input_collections = args .collections ,
779943 output_run = args .output_run ,
944+ ** extra_args ,
780945 )
781946 qg : QuantumGraph = builder .build (
782947 # Metadata includes a subset of attributes defined in CmdLineFwk.
0 commit comments