3737from collections .abc import Iterable , Sequence
3838from typing import Any
3939
40+ import astropy .units as apu
4041import numpy as np
4142import pandas as pd
4243from lsst .daf .butler import Butler , DatasetRef , DatasetType
6061# written in task metadata were platform-dependent. Once we no longer care
6162# about older runs, this import and the code that uses it can be removed.
6263from lsst .utils .usage import _RUSAGE_MEMORY_MULTIPLIER
64+ from lsst .verify import Measurement
65+
66+ from ..interfaces import MetricMeasurementBundle
67+ from ..interfaces ._task import _timestampValidator
6368
6469_LOG = logging .getLogger (__name__ )
6570
6671
72+ def _resource_table_to_bundle (
73+ table : pd .DataFrame , dataset_identifier : str , reference_package : str , timestamp_version : str
74+ ) -> MetricMeasurementBundle :
75+ """Convert a resource usage table into a `MetricMeasurementBundle`
76+
77+ See `lsst.analysis.tools.interfaces.AnalysisPipelineTask` for more
78+ information on each of the following options.
79+
80+ Parameters
81+ ----------
82+ table : `DataFrame`
83+ Resource Usge in the the form of a DataFrame.
84+ dataset_identifier : `str`
85+ The name of the data processing to associate with this metric bundle.
86+ reference_package : `str`
87+ The reference package to use if the timestamp version is set to a
88+ package version.
89+ timestamp_version : `str`
90+ The type of timestamp to associate with the bundle.
91+ """
92+ bundle = MetricMeasurementBundle (
93+ dataset_identifier = dataset_identifier ,
94+ reference_package = reference_package ,
95+ timestamp_version = timestamp_version ,
96+ )
97+ # determine all the columns in the table these will become measurements.
98+ column_keys = set (table .keys ())
99+ # discard the task, as this will be like the AnalysisTools in the bundle.
100+ column_keys .remove ("task" )
101+ # Measurements need units, use this to map the column to unit type.
102+ unit_mapping = (
103+ ("quanta" , apu .Unit ("count" )),
104+ ("_hrs" , apu .Unit ("hour" )),
105+ ("_GB" , apu .Unit ("Gbyte" )),
106+ ("_s" , apu .Unit ("s" )),
107+ )
108+ # for each row, grab the task name, and create a list of measurements.
109+ for _ , row in table .iterrows ():
110+ task_name = f"{ row ['task' ]} _memrun"
111+ task_data = []
112+ for key in column_keys :
113+ unit = None
114+ for stub , value in unit_mapping :
115+ if stub in key :
116+ unit = value
117+ if unit is None :
118+ raise ValueError (f"Could not determine units for task { row ['task' ]} " )
119+ task_data .append (Measurement (key , row [key ] * unit ))
120+ bundle [task_name ] = task_data
121+ return bundle
122+
123+
67124class ConsolidateResourceUsageConnections (PipelineTaskConnections , dimensions = ()):
68125 """Connection definitions for `ConsolidateResourceUsageTask`."""
69126
@@ -74,6 +131,16 @@ class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=()
74131 doc = "Consolidated table of resource usage statistics. One row per task label" ,
75132 )
76133
134+ output_metrics = cT .Output (
135+ name = "ResourceUsageSummary_metrics" ,
136+ storageClass = "MetricMeasurementBundle" ,
137+ dimensions = (),
138+ doc = (
139+ "MetricMeasurementBundle with the same information as the ResourceUsageSummary in the form "
140+ "required for Sasquatch dispatch"
141+ ),
142+ )
143+
77144 def __init__ (self , * , config ):
78145 super ().__init__ (config = config )
79146 for name in self .config .input_names :
@@ -88,6 +155,8 @@ def __init__(self, *, config):
88155 ),
89156 )
90157 self .inputs .add (name )
158+ if not self .config .do_make_metrics :
159+ self .outputs .remove ("output_metrics" )
91160
92161
93162class ConsolidateResourceUsageConfig (
@@ -99,6 +168,25 @@ class ConsolidateResourceUsageConfig(
99168 doc = "Input resource usage dataset type names" ,
100169 default = [],
101170 )
171+ do_make_metrics = Field [bool ](doc = "Make metric bundle in addition to DataFrame" , default = False )
172+ dataset_identifier = Field [str ](doc = "An identifier to be associated with output Metrics" , optional = True )
173+ reference_package = Field [str ](
174+ doc = "A package whos version, at the time of metric upload to a "
175+ "time series database, will be converted to a timestamp of when "
176+ "that version was produced" ,
177+ default = "lsst_distrib" ,
178+ optional = True ,
179+ )
180+ timestamp_version = Field [str ](
181+ doc = "Which time stamp should be used as the reference timestamp for a "
182+ "metric in a time series database, valid values are; "
183+ "reference_package_timestamp, run_timestamp, current_timestamp, "
184+ "dataset_timestamp and explicit_timestamp:datetime where datetime is "
185+ "given in the form %Y%m%dT%H%M%S%z" ,
186+ default = "run_timestamp" ,
187+ check = _timestampValidator ,
188+ optional = True ,
189+ )
102190
103191
104192class ConsolidateResourceUsageTask (PipelineTask ):
@@ -113,6 +201,7 @@ class ConsolidateResourceUsageTask(PipelineTask):
113201 """
114202
115203 ConfigClass = ConsolidateResourceUsageConfig
204+ config : ConsolidateResourceUsageConfig
116205 _DefaultName = "consolidateResourceUsage"
117206
118207 def run (self , ** kwargs : Any ) -> Struct :
@@ -166,8 +255,18 @@ def run(self, **kwargs: Any) -> Struct:
166255 .sort_values ("task" ),
167256 memrun ,
168257 )
258+ results = Struct (output_table = memrun )
259+
260+ if self .config .do_make_metrics :
261+ bundle = _resource_table_to_bundle (
262+ memrun ,
263+ self .config .dataset_identifier ,
264+ self .config .reference_package ,
265+ self .config .timestamp_version ,
266+ )
267+ results .output_metrics = bundle
169268
170- return Struct ( output_table = memrun )
269+ return results
171270
172271
173272class GatherResourceUsageConnections (
@@ -547,6 +646,18 @@ class ResourceUsageQuantumGraphBuilder(QuantumGraphBuilder):
547646 Whether *execution* of this quantum graph will permit clobbering. If
548647 `False` (default), existing outputs in ``output_run`` are an error
549648 unless ``skip_existing_in`` will cause those quanta to be skipped.
649+ make_metric : `bool`, optional
650+ Produce a metric measurement bundle when processing the output
651+ table.
652+ timestamp_version : `str`, optional
653+ The type of timestamp used when creating a `MetricMeasurementBundle`,
654+ see there for more details.
655+ dataset_identifier: `str`, optional
656+ A processing identifer that is associated with the processing of this
657+ data, for instance "RC2_subset" for the nightly reprocessings.
658+ reference_package : `str`, optional
659+ The dataset used as an identifier when timestamp_version is set to
660+ reference_package.
550661
551662 Notes
552663 -----
@@ -567,6 +678,10 @@ def __init__(
567678 output_run : str | None = None ,
568679 skip_existing_in : Sequence [str ] = (),
569680 clobber : bool = False ,
681+ make_metric : bool = False ,
682+ timestamp_version : str | None = None ,
683+ dataset_identifier : str | None = None ,
684+ reference_package : str | None = None ,
570685 ):
571686 # Start by querying for metadata datasets, since we'll need to know
572687 # which dataset types exist in the input collections in order to
@@ -580,6 +695,11 @@ def __init__(
580695 pipeline_graph = PipelineGraph ()
581696 metadata_refs : dict [str , set [DatasetRef ]] = {}
582697 consolidate_config = ConsolidateResourceUsageConfig ()
698+ if make_metric :
699+ consolidate_config .do_make_metrics = True
700+ consolidate_config .dataset_identifier = dataset_identifier
701+ consolidate_config .timestamp_version = timestamp_version
702+ consolidate_config .reference_package = reference_package
583703 for results in butler .registry .queryDatasets (
584704 input_dataset_types ,
585705 where = where ,
@@ -753,6 +873,37 @@ def make_argument_parser(cls) -> argparse.ArgumentParser:
753873 default = None ,
754874 metavar = "RUN" ,
755875 )
876+ parser .add_argument (
877+ "--make-metric" ,
878+ type = bool ,
879+ help = (
880+ "Turn the output resource usage table into a metric measurement bundle format compatible "
881+ "with Sasquatch."
882+ ),
883+ default = True ,
884+ metavar = "DO_MAKE_METRIC" ,
885+ )
886+ parser .add_argument (
887+ "--dataset-identifier" ,
888+ type = str ,
889+ help = "Set the dataset these results are associated with." ,
890+ default = None ,
891+ metavar = "DATASET_IDENTIFIER" ,
892+ )
893+ parser .add_argument (
894+ "--reference-package" ,
895+ type = str ,
896+ help = "Reference package to use when selecting reference timestamp" ,
897+ default = "lsst_distrib" ,
898+ metavar = "REFERENCE_PACKAGE" ,
899+ )
900+ parser .add_argument (
901+ "--timestamp-version" ,
902+ type = str ,
903+ help = "Set the dataset these results are associated with." ,
904+ default = "run_timestamp" ,
905+ metavar = "TIMESTAMP_VERSION" ,
906+ )
756907 return parser
757908
758909 @classmethod
@@ -770,13 +921,26 @@ def main(cls) -> None:
770921 raise ValueError ("At least one of --output or --output-run options is required." )
771922 args .output_run = "{}/{}" .format (args .output , Instrument .makeCollectionTimestamp ())
772923
924+ extra_args = {}
925+ if args .make_metric :
926+ if args .dataset_identifier is None or args .timestamp_version is None :
927+ raise ValueError (
928+ "If metrics are going to be created, --dataset-identifier and --timestamp-version "
929+ "must be specified."
930+ )
931+ extra_args ["make_metric" ] = True
932+ extra_args ["timestamp_version" ] = args .timestamp_version
933+ extra_args ["dataset_identifier" ] = args .dataset_identifier
934+ extra_args ["reference_package" ] = args .reference_package
935+
773936 butler = Butler (args .repo , collections = args .collections )
774937 builder = cls (
775938 butler ,
776939 dataset_type_names = args .dataset_types ,
777940 where = args .where ,
778941 input_collections = args .collections ,
779942 output_run = args .output_run ,
943+ ** extra_args ,
780944 )
781945 qg : QuantumGraph = builder .build (
782946 # Metadata includes a subset of attributes defined in CmdLineFwk.
0 commit comments