From 832b3f65e51f05f0aaf2c83331061753b5bf1c35 Mon Sep 17 00:00:00 2001 From: burlingamet Date: Wed, 24 Sep 2025 12:51:04 -0600 Subject: [PATCH 01/21] change plvuio to look at calibration loader again --- pipe/pluvio/pluvio_calibration_assignment.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipe/pluvio/pluvio_calibration_assignment.yaml b/pipe/pluvio/pluvio_calibration_assignment.yaml index 0a475cb04..45db18e92 100644 --- a/pipe/pluvio/pluvio_calibration_assignment.yaml +++ b/pipe/pluvio/pluvio_calibration_assignment.yaml @@ -18,7 +18,7 @@ input: cross: - pfs: name: DIR_IN - repo: pluvio_calibration_loader_test_files + repo: pluvio_calibration_loader glob: /*/* - pfs: name: FILE_YEAR From 7b2e6c93453cb62b80230dc768b70fe1c30d7f30 Mon Sep 17 00:00:00 2001 From: burlingamet Date: Wed, 24 Sep 2025 16:27:35 -0600 Subject: [PATCH 02/21] updating dates and turning off cron so it doesn't continue to run. --- pipe/pluvio/pluvio_cron_daily_and_date_control.yaml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml b/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml index 3c30f7417..84339a051 100644 --- a/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml +++ b/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml @@ -10,8 +10,8 @@ transform: # NOTE that the pluvio has different site-dates for kafka and trino loaders. # This is because some kafka topics start with dfir instead of simply , which the trino loader cannot handle LAG_DAYS_END: "2" # Default is 2. Don't go lower than 2. - START_DATE: "2025-08-20" - #END_DATE: "2024-06-01" + START_DATE: "2025-07-01" + END_DATE: "2025-09-01" OUT_PATH_KAFKA: /pfs/out/kafka OUT_PATH_TRINO: /pfs/out/trino SOURCE_TYPE: "pluvio_raw" @@ -41,16 +41,17 @@ input: # This cron is the central driver for daily scheduled updates, such as data ingest and metadata loaders. - cron: name: tick - spec: "0 7 * * *" # Run at 00:00 MST (07:00 GMT) + #spec: "0 7 * * *" # Run at 00:00 MST (07:00 GMT) + spec: "@never" overwrite: true - pfs: name: SITE_FILE_KAFKA repo: pluvio_site_list - glob: /site-list-kafka.json + glob: /site-list.json - pfs: name: SITE_FILE_TRINO repo: pluvio_site_list - glob: /site-list-trino.json + glob: /site-list.json resource_requests: memory: 100M cpu: 1 From d2b99ca3ec45f4b900ab631fed0e0cf416464b8e Mon Sep 17 00:00:00 2001 From: burlingamet Date: Wed, 15 Oct 2025 14:10:45 -0600 Subject: [PATCH 03/21] testing different source type for cal loader --- pipe/pluvio/pluvio_calibration_loader.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipe/pluvio/pluvio_calibration_loader.yaml b/pipe/pluvio/pluvio_calibration_loader.yaml index acaebcd11..d00f4f237 100644 --- a/pipe/pluvio/pluvio_calibration_loader.yaml +++ b/pipe/pluvio/pluvio_calibration_loader.yaml @@ -27,7 +27,7 @@ transform: CVAL_INGEST_BUCKET: neon-cval OUT_PATH: /tmp/out LOG_LEVEL: INFO - SOURCE_TYPE: "pluvio_raw" + SOURCE_TYPE: "pluvio" SOURCE_TYPE_OUT: "pluvio" STARTING_PATH_INDEX: "5" secrets: From 96f169f4521d567db171f9b94da06f0ddcfe234e Mon Sep 17 00:00:00 2001 From: burlingamet Date: Thu, 16 Oct 2025 17:20:36 -0600 Subject: [PATCH 04/21] change back to pluvio raw --- pipe/pluvio/pluvio_calibration_loader.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipe/pluvio/pluvio_calibration_loader.yaml b/pipe/pluvio/pluvio_calibration_loader.yaml index d00f4f237..acaebcd11 100644 --- a/pipe/pluvio/pluvio_calibration_loader.yaml +++ b/pipe/pluvio/pluvio_calibration_loader.yaml @@ -27,7 +27,7 @@ transform: CVAL_INGEST_BUCKET: neon-cval OUT_PATH: /tmp/out LOG_LEVEL: INFO - SOURCE_TYPE: "pluvio" + SOURCE_TYPE: "pluvio_raw" SOURCE_TYPE_OUT: "pluvio" STARTING_PATH_INDEX: "5" secrets: From 24ea5bd4374e78ee4566dd44db3b85217bb80001 Mon Sep 17 00:00:00 2001 From: covesturtevant Date: Fri, 17 Oct 2025 08:08:41 -0600 Subject: [PATCH 05/21] use test image for calibration loader that differentiates between sensor type and avro schema --- pipe/pluvio/pluvio_calibration_loader.yaml | 31 ++++++---------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/pipe/pluvio/pluvio_calibration_loader.yaml b/pipe/pluvio/pluvio_calibration_loader.yaml index acaebcd11..a9f0df093 100644 --- a/pipe/pluvio/pluvio_calibration_loader.yaml +++ b/pipe/pluvio/pluvio_calibration_loader.yaml @@ -2,33 +2,18 @@ pipeline: name: pluvio_calibration_loader transform: - image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-cval-loader:v2.3.3 + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-cval-loader:sha-c694dca cmd: - - sh - - "-c" - - |- - /bin/bash <<'EOF' - # Use bash-scrict mode. See http://redsymbol.net/articles/unofficial-bash-sstrict-mode/ - set -euo pipefail - IFS=$'\n\t' - - # Refresh interim directories with each datum (otherwise they persist and cause probs) - rm -rf $OUT_PATH - mkdir -p $OUT_PATH # R modules must have pfs in the repo structure - - python3 -m calval_loader.load_all_calval_files #run the calibration loader - - if [[ -d "$OUT_PATH/$SOURCE_TYPE" ]]; then - cp -r $OUT_PATH/$SOURCE_TYPE /pfs/out/$SOURCE_TYPE_OUT - fi - - EOF + - /bin/bash + stdin: + - '#!/bin/bash' + - python3 -m calval_loader.load_all_calval_files env: CVAL_INGEST_BUCKET: neon-cval - OUT_PATH: /tmp/out + OUT_PATH: /pfs/out LOG_LEVEL: INFO - SOURCE_TYPE: "pluvio_raw" - SOURCE_TYPE_OUT: "pluvio" + SOURCE_TYPE: "pluvio" + SCHEMA_NAME: "pluvio_raw" STARTING_PATH_INDEX: "5" secrets: - name: pdr-secret From 7838b5b4c487248ecfb158094969f51872980c55 Mon Sep 17 00:00:00 2001 From: burlingamet Date: Tue, 17 Mar 2026 11:38:22 -0600 Subject: [PATCH 06/21] Add WREFDFIR and change cron to current time window --- pipe/pluvio/pluvio_cron_daily_and_date_control.yaml | 4 ++-- pipe/pluvio/site-list.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml b/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml index 4be786cb5..a8f0face8 100644 --- a/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml +++ b/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml @@ -10,8 +10,8 @@ transform: # NOTE that the pluvio has different site-dates for kafka and trino loaders. # This is because some kafka topics start with dfir instead of simply , which the trino loader cannot handle LAG_DAYS_END: "2" # Default is 2. Don't go lower than 2. - START_DATE: "2026-02-22" - #END_DATE: "2024-06-01" + START_DATE: "2026-03-01" + END_DATE: "2026-03-17" OUT_PATH_KAFKA: /pfs/out/kafka OUT_PATH_TRINO: /pfs/out/trino SOURCE_TYPE: "pluvio_raw" diff --git a/pipe/pluvio/site-list.json b/pipe/pluvio/site-list.json index c1d308eb3..84588793f 100644 --- a/pipe/pluvio/site-list.json +++ b/pipe/pluvio/site-list.json @@ -92,7 +92,7 @@ "kafka_start_date" : "2023-05-24" }, { - "site" : "WREF", + "site" : "WREFDFIR", "kafka_start_date" : "2023-02-03" }, { From fad5b39ba8fefa40568ecba9dbb39eba9ce3586d Mon Sep 17 00:00:00 2001 From: burlingamet Date: Wed, 8 Apr 2026 12:16:46 -0600 Subject: [PATCH 07/21] pluvio updates to only use one schema. Change logic in SUM to not have a 0 when all 30 minute points are NA --- .../wrap.precip.pluvio.flags.R | 45 ++----------------- .../wrap.precip.pluvio.stats.R | 4 +- 2 files changed, 6 insertions(+), 43 deletions(-) diff --git a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R index 15bbfa0a0..4344afe3d 100644 --- a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R +++ b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R @@ -67,6 +67,8 @@ # changelog and author contributions / copyrights # Teresa Burlingame (2025-04-15) # Initial creation +# Teresa Burlingame (2026-04-08) +# removing unnecessary block and adding in schema to report write ############################################################################################## wrap.precip.pluvio.flags<- function(DirIn, DirOutBase, @@ -118,15 +120,6 @@ wrap.precip.pluvio.flags<- function(DirIn, Df=TRUE, log=log) - ## wipe preexisting schema TODO check with Cove - # Remove the "schema" attribute - #remove existing schema from plau so we can add more cols. - if (is.null(SchmQf)){ - base::attr(qfPlau, "schema") <- NULL - } else { - base::attr(qfPlau, "schema") <- SchmQf - } - # if there are no heater streams add them in as NA if(!('heater_status' %in% names(data))){ data$heater_status <- NA @@ -185,39 +178,6 @@ wrap.precip.pluvio.flags<- function(DirIn, } } } - - # "pass through" of data - # TODO ask Cove if this is necessary? - # qfs added to list of flags to process through qm module. - # - # #get file name based on date of data in directory - # nameFileOut <- fileData - # - # # Write out the time shifted dataset to file - # fileOut <- fs::path(dirOutData,nameFileOut) - # - # rptWrte <- - # base::try(NEONprocIS.base::def.wrte.parq( - # data = data, - # NameFile = fileOut, - # log=log - # ), - # silent = TRUE) - # - # if ('try-error' %in% base::class(rptWrte)) { - # log$error(base::paste0( - # 'Cannot write output to ', - # fileOut, - # '. ', - # attr(rptWrte, "condition") - # )) - # stop() - # } else { - # log$info(base::paste0( - # 'Data file written to file ', - # fileOut - # )) - # } nameFileQfOutFlag <- fileQfPlau @@ -227,6 +187,7 @@ wrap.precip.pluvio.flags<- function(DirIn, base::try(NEONprocIS.base::def.wrte.parq( data = qfPlau, NameFile = nameFileQfOutFlag, + Schm=SchmQf, log=log ), silent = TRUE) diff --git a/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R b/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R index 45f38d02d..81170d8bb 100644 --- a/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R +++ b/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R @@ -66,6 +66,8 @@ # changelog and author contributions / copyrights # Teresa Burlingame (2025-07-21) # Initial creation +# Teresa Burlingame (2025-04-08) +# change sum logic to be NA when all 30 mins are NA. ############################################################################################## wrap.precip.pluvio.stats <- function(DirIn, DirOutBase, @@ -272,7 +274,7 @@ wrap.precip.pluvio.stats <- function(DirIn, stats_30min <- stats_01min[, .( startDateTime = min(startDateTime), endDateTime = max(endDateTime), - precipBulk = sum(precipBulk, na.rm = TRUE), + precipBulk = ifelse(all(is.na(precipBulk)), NA_real_, sum(precipBulk, na.rm = TRUE)), precipBulkExpUncert = sqrt(sum(precipBulkExpUncert^2, na.rm = TRUE)) * 2, # Quadrature sum with 2x multiplier precipNumPts = sum(precipNumPts, na.rm = TRUE), # Sum the counts from 1-minute intervals nullQF = as.integer(ifelse(mean(nullQF == 1, na.rm = TRUE) >= 0.1, 1L, From 07abeb1b392fadead6f1e4634b11a872f667e719 Mon Sep 17 00:00:00 2001 From: burlingamet Date: Wed, 8 Apr 2026 15:43:52 -0600 Subject: [PATCH 08/21] update cron for more dates, sha tag testing of updates to pluvio code, delete unused pipe --- .../pluvio_cron_daily_and_date_control.yaml | 2 +- ...precipWeighingv2_analyze_pad_and_qaqc.yaml | 7 +- ...Weighingv2_qm_stats_group_and_compute.yaml | 2 +- ...cipWeighingv2_stats_group_and_compute.yaml | 101 ------------------ 4 files changed, 3 insertions(+), 109 deletions(-) delete mode 100644 pipe/precipWeighingv2/precipWeighingv2_stats_group_and_compute.yaml diff --git a/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml b/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml index a8f0face8..ab03f95ff 100644 --- a/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml +++ b/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml @@ -11,7 +11,7 @@ transform: # This is because some kafka topics start with dfir instead of simply , which the trino loader cannot handle LAG_DAYS_END: "2" # Default is 2. Don't go lower than 2. START_DATE: "2026-03-01" - END_DATE: "2026-03-17" + END_DATE: "2026-04-08" OUT_PATH_KAFKA: /pfs/out/kafka OUT_PATH_TRINO: /pfs/out/trino SOURCE_TYPE: "pluvio_raw" diff --git a/pipe/precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml b/pipe/precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml index a5297dc6a..2f9270d57 100644 --- a/pipe/precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml +++ b/pipe/precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml @@ -4,7 +4,7 @@ pipeline: transform: image_pull_secrets: - battelleecology-quay-read-all-pull-secret - image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pluvio-flgs:v0.0.3 + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pluvio-flgs:sha-fad5b39 cmd: - sh - "-c" @@ -34,7 +34,6 @@ transform: DirIn=/tmp/pfs/padded_analyzerCopy \ DirOut=/tmp/pfs/plau \ DirErr=/pfs/out/errored_datums \ - FileSchmQf=$SCHEMA_FLAGS \ "TermTest1=accu_nrt:null|gap|range(rmv)" # Run third module - custom flags @@ -60,10 +59,6 @@ input: name: DATA_PATH repo: precipWeighingv2_thresh_select_ts_pad glob: /*/*/* - - pfs: - name: SCHEMA_FLAGS - repo: precipWeighingv2_avro_schemas - glob: /precipWeighingv2/flags_plausibility_precipWeighingv2.avsc - pfs: name: SCHEMA_FLAGS_CUST repo: precipWeighingv2_avro_schemas diff --git a/pipe/precipWeighingv2/precipWeighingv2_qm_stats_group_and_compute.yaml b/pipe/precipWeighingv2/precipWeighingv2_qm_stats_group_and_compute.yaml index b6b078c84..58334ac82 100644 --- a/pipe/precipWeighingv2/precipWeighingv2_qm_stats_group_and_compute.yaml +++ b/pipe/precipWeighingv2/precipWeighingv2_qm_stats_group_and_compute.yaml @@ -2,7 +2,7 @@ pipeline: name: precipWeighingv2_qm_stats_group_and_compute transform: - image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pluvio-stats:v0.0.3 + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pluvio-stats:sha-fad5b39 cmd: - sh - "-c" diff --git a/pipe/precipWeighingv2/precipWeighingv2_stats_group_and_compute.yaml b/pipe/precipWeighingv2/precipWeighingv2_stats_group_and_compute.yaml deleted file mode 100644 index 858092c94..000000000 --- a/pipe/precipWeighingv2/precipWeighingv2_stats_group_and_compute.yaml +++ /dev/null @@ -1,101 +0,0 @@ ---- -pipeline: - name: precipWeighingv2_stats_group_and_compute -transform: - image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-stat-basc-grp:v2.0.4 - cmd: - - sh - - "-c" - - |- - /bin/bash <<'EOF' - # Use bash-scrict mode. See http://redsymbol.net/articles/unofficial-bash-strict-mode/ - set -euo pipefail - IFS=$'\n\t' - # Refresh interim directories with each datum (otherwise they persist and cause probs) - rm -r -f /tmp/pfs/filter_joined - mkdir -p /tmp/pfs/filter_joined - # Run first module - filter-joiner (using environment variables below as input parameters) - python3 -m filter_joiner.filter_joiner_main - # Run second module - basic stats - Rscript ./flow.stat.basc.R \ - DirIn=/tmp/pfs/filter_joined \ - DirOut=/pfs/out \ - DirErr=/pfs/out/errored_datums \ - FileSchmStat=$FILE_SCHEMA_STATS \ - "WndwAgr=030" \ - "TermStat1=accu_nrt:sum|numPts|expUncert(wrap.ucrt.dp01.cal.cnst)" - - Rscript ./flow.stat.basc.R \ - DirIn=/tmp/pfs/filter_joined \ - DirOut=/pfs/out \ - DirErr=/pfs/out/errored_datums \ - FileSchmStat=$FILE_SCHEMA_STATS_INST \ - "WndwAgr=001" \ - "TermStat1=accu_nrt:sum|expUncert(wrap.ucrt.dp01.cal.cnst)" - EOF - env: - # Environment variables for filter-joiner - CONFIG: | - --- - # In Pachyderm root will be index 0, 'pfs' index 1, and the repo name index 2. - # Metadata indices will typically begin at index 3. - # Use unix-style glob pattern to select the desired directories in each repo - input_paths: - - path: - name: QAQC_PATH - # Filter for data & uncertainty_data directories - glob_pattern: /pfs/QAQC_PATH/*/*/*/*/*/*/data/** - # Join on named location (already joined below by day) - join_indices: [6] - - path: - name: UNCERTAINTY_PATH - # Filter for data directory - glob_pattern: /pfs/UNCERTAINTY_PATH/*/*/*/*/*/*/uncertainty*/** - # Join on named location (already joined below by day) - join_indices: [6] - OUT_PATH: /tmp/pfs/filter_joined - LOG_LEVEL: INFO - RELATIVE_PATH_INDEX: "3" - LINK_TYPE: COPY # options are COPY or SYMLINK. Use COPY for combined module. - # Environment variables for calibration module - PARALLELIZATION_INTERNAL: '5' # Option for stats module -input: - cross: - - pfs: - name: FILE_SCHEMA_STATS - repo: precipWeighingv2_avro_schemas - glob: /precipWeighingv2/precipWeighingv2_dp01_stats_pluvio.avsc - - pfs: - name: FILE_SCHEMA_STATS_INST - repo: precipWeighingv2_avro_schemas - glob: /precipWeighingv2/precipWeighingv2_dp01_stats_pluvio_inst.avsc - - join: - - pfs: - name: QAQC_PATH - repo: precipWeighingv2_analyze_pad_and_qaqc - glob: /(*/*/*) - joinOn: $1 - empty_files: false # Make sure this is false for LINK_TYPE=COPY - - pfs: - name: UNCERTAINTY_PATH - repo: precipWeighingv2_group_path - glob: /(*/*/*) - joinOn: $1 - empty_files: false # Make sure this is false for LINK_TYPE=COPY -parallelism_spec: - constant: 5 -resource_requests: - memory: 1.8G - cpu: 6 -resource_limits: - memory: 3G - cpu: 7 -sidecar_resource_requests: - memory: 3G - cpu: 0.5 -autoscaling: true -datum_set_spec: - number: 1 -scheduling_spec: - node_selector: - cloud.google.com/compute-class: pach-pipeline-class From df4d5eedc62df1fad22f6577f0af92633649a6ee Mon Sep 17 00:00:00 2001 From: burlingamet Date: Wed, 8 Apr 2026 17:16:52 -0600 Subject: [PATCH 09/21] pulling cron back to have full month of data --- pipe/pluvio/pluvio_cron_daily_and_date_control.yaml | 2 +- .../precipWeighingv2_level1_group_consolidate_srf.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml b/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml index ab03f95ff..043204f1d 100644 --- a/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml +++ b/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml @@ -10,7 +10,7 @@ transform: # NOTE that the pluvio has different site-dates for kafka and trino loaders. # This is because some kafka topics start with dfir instead of simply , which the trino loader cannot handle LAG_DAYS_END: "2" # Default is 2. Don't go lower than 2. - START_DATE: "2026-03-01" + START_DATE: "2026-02-27" END_DATE: "2026-04-08" OUT_PATH_KAFKA: /pfs/out/kafka OUT_PATH_TRINO: /pfs/out/trino diff --git a/pipe/precipWeighingv2/precipWeighingv2_level1_group_consolidate_srf.yaml b/pipe/precipWeighingv2/precipWeighingv2_level1_group_consolidate_srf.yaml index 78179bda4..c080a06f0 100644 --- a/pipe/precipWeighingv2/precipWeighingv2_level1_group_consolidate_srf.yaml +++ b/pipe/precipWeighingv2/precipWeighingv2_level1_group_consolidate_srf.yaml @@ -170,7 +170,7 @@ transform: PARALLELIZATION_INTERNAL: '2' # Environment variables for the L1 archiver - GROUP_PREFIX: par-quantum-line # no ending "_" + GROUP_PREFIX: precip-weighing-v2 # no ending "_" secrets: - name: pdr-secret @@ -183,7 +183,7 @@ input: join: - pfs: name: STATISTICS_PATH - repo: precipWeighingv2_stats_group_and_compute + repo: precipWeighingv2_qm_stats_group_and_compute glob: /(*/*/*) joinOn: $1 outer_join: true # Need outer join to pull in with or without SRFs From 7b48b6f763de20f5151f0e6bd87e26bde2f1a2d1 Mon Sep 17 00:00:00 2001 From: burlingamet Date: Thu, 16 Apr 2026 13:10:23 -0600 Subject: [PATCH 10/21] add pub pipes --- .../pipe_list_precipWeighingv2.txt | 6 +- ...ighingv2_cron_monthly_and_pub_control.yaml | 36 ++++ ...ecipWeighingv2_pub_egress_and_publish.yaml | 173 ++++++++++++++++++ ...ecipWeighingv2_pub_format_and_package.yaml | 123 +++++++++++++ .../precipWeighingv2_pub_group.yaml | 53 ++++++ 5 files changed, 390 insertions(+), 1 deletion(-) create mode 100644 pipe/precipWeighingv2/precipWeighingv2_cron_monthly_and_pub_control.yaml create mode 100644 pipe/precipWeighingv2/precipWeighingv2_pub_egress_and_publish.yaml create mode 100644 pipe/precipWeighingv2/precipWeighingv2_pub_format_and_package.yaml create mode 100644 pipe/precipWeighingv2/precipWeighingv2_pub_group.yaml diff --git a/pipe/precipWeighingv2/pipe_list_precipWeighingv2.txt b/pipe/precipWeighingv2/pipe_list_precipWeighingv2.txt index 5e5ffb148..43a827124 100644 --- a/pipe/precipWeighingv2/pipe_list_precipWeighingv2.txt +++ b/pipe/precipWeighingv2/pipe_list_precipWeighingv2.txt @@ -7,4 +7,8 @@ precipWeighingv2_threshold.yaml precipWeighingv2_thresh_select_ts_pad.yaml precipWeighingv2_analyze_pad_and_qaqc.yaml precipWeighingv2_qm_stats_group_and_compute.yaml - +precipWeighingv2_level1_group_consolidate_srf.yaml +precipWeighingv2_cron_monthly_and_pub_control.yaml +precipWeighingv2_pub_group.yaml +precipWeighingv2_pub_format_and_package.yaml +precipWeighingv2_pub_egress_and_publish.yaml \ No newline at end of file diff --git a/pipe/precipWeighingv2/precipWeighingv2_cron_monthly_and_pub_control.yaml b/pipe/precipWeighingv2/precipWeighingv2_cron_monthly_and_pub_control.yaml new file mode 100644 index 000000000..db4ddde9c --- /dev/null +++ b/pipe/precipWeighingv2/precipWeighingv2_cron_monthly_and_pub_control.yaml @@ -0,0 +1,36 @@ +--- +pipeline: + name: precipWeighingv2_cron_monthly_and_pub_control +transform: + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pub-cntl:v1.1.0 + cmd: ["/bin/bash"] + env: + # START_MONTH and END_MONTH indicate the date range (inclusive) to create the /Y/M folder structure + # START_DATE must be set, format "YYYY-MM" + # END_DATE can be set or unset (comment or remove line to unset). If unset, end month will be last month. + OUT_PATH: /pfs/out + START_MONTH: "2026-03" + END_MONTH: "2026-03" # Inclusive. Run the pipeline with END_MONTH set to initialize, then comment out and update pipeline (no reprocess) to let the cron take over + stdin: + - "#!/bin/bash" + - ./cron_monthly_and_pub_control/populate_pub_months.sh +input: + # Choose a monthly cron date to be something sufficiently after the 1st to allow kafka lag and timeseries pad + cron: + name: tick + spec: "@never" + #spec: "0 7 9 * *" # Run at 00:00 MST (07:00 GMT) on the 9th of the month + overwrite: true +autoscaling: true +resource_requests: + memory: 64M + cpu: 0.1 +resource_limits: + memory: 200M + cpu: 1 +sidecar_resource_requests: + memory: 200M + cpu: 0.1 +scheduling_spec: + node_selector: + cloud.google.com/compute-class: pach-pipeline-class \ No newline at end of file diff --git a/pipe/precipWeighingv2/precipWeighingv2_pub_egress_and_publish.yaml b/pipe/precipWeighingv2/precipWeighingv2_pub_egress_and_publish.yaml new file mode 100644 index 000000000..397dabe10 --- /dev/null +++ b/pipe/precipWeighingv2/precipWeighingv2_pub_egress_and_publish.yaml @@ -0,0 +1,173 @@ +--- +pipeline: + name: precipWeighingv2_pub_egress_and_publish +transform: + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pub-egrs-publ:v5.0.0 + cmd: + - sh + - "-c" + - |- + /bin/bash <<'EOF' + # Use bash-scrict mode. See http://redsymbol.net/articles/unofficial-bash-strict-mode/ + set -euo pipefail + IFS=$'\n\t' + + curl -o $OUT_MDP_SITES https://raw.githubusercontent.com/NEONScience/NEON-IS-data-processing-inputs/refs/heads/main/mdp_sites_list.txt + + # Run first module - pub_egress (using environment variables below as input parameters) + if [[ $(echo $DATA_PATH) ]]; then + python3 -m pub_egress.pub_egress_main + fi + # If there is output, egress it + if ls $OUT_PATH/NEON.DOM.SITE* 1> /dev/null 2>&1; then + for DIR in $OUT_PATH/NEON.DOM.SITE*; do + echo "Starting non-MDP sites==================" + echo "Syncing $DIR to bucket $BUCKET_NAME" + # Parse the product + [[ "$DIR" =~ ^$OUT_PATH/(.*)$ ]] + PRODUCT="${BASH_REMATCH[1]}" + echo "PRODUCT is $PRODUCT" + rclone \ + --no-check-dest \ + --copy-links \ + --gcs-bucket-policy-only \ + --gcs-no-check-bucket \ + copy \ + "${OUT_PATH}/${PRODUCT}" \ + ":gcs://${BUCKET_NAME}/${PRODUCT}" + done + echo "============ Done for non-MDP sites" + else + echo "No pub output to egress" + fi + + # + # Do the same for MDP sites if mdp sites exists in the output + # Check to see if the output need to be sent to the staging or not + # For example, BUCKET_NAME_MDP: neon-aa-dev-md03-staging/Publication for staging SITE=MD03 + # Read mdp_site_list from githubusercontent + # + if ls $OUT_PATH_MDP/NEON.DOM.SITE* 1> /dev/null 2>&1; then + for DIR in $OUT_PATH_MDP/NEON.DOM.SITE*; do + echo "=" + echo "Starting MDP sites==================" + # Parse the product + [[ "$DIR" =~ ^$OUT_PATH_MDP/(.*)$ ]] + PRODUCT="${BASH_REMATCH[1]}" + echo "PRODUCT is $PRODUCT" + for DIR_SUB in $DIR/MD*; do + echo "DIR is $DIR" + echo "DIR_SUB is $DIR_SUB" + # Parse the site + [[ "$DIR_SUB" =~ ^$DIR/(.*)$ ]] + SITE="${BASH_REMATCH[1]}" + # to change to lowercase in case + # export site="${SITE,,}" + # + while read -r mdpsite prod staging bucket_name + do + if [[ $SITE == $mdpsite ]] && [[ $prod == $PROD ]] && [[ $staging == $STAGING ]]; then + BUCKET_NAME_MDP=$bucket_name + echo "$mdpsite products to $bucket_name bucket" + else echo "**** No products available for $mdpsite to $bucket_name bucket" + fi + done < $OUT_MDP_SITES + echo "Syncing $SITE products directory $DIR to mdp bucket $BUCKET_NAME_MDP" + done + rclone \ + --no-check-dest \ + --copy-links \ + --gcs-bucket-policy-only \ + --gcs-no-check-bucket \ + copy \ + "${OUT_PATH_MDP}/${PRODUCT}" \ + ":gcs://${BUCKET_NAME_MDP}/${PRODUCT}" + done + echo "============ Done for MDP sites" + #cp -f "$OUT_MDP_SITES" $OUT_PATH_MDP/mdp_sites.txt + else + echo "No MDP pub output to egress" + fi + + # Run second module - pub_upload (using environment variables below as input parameters) + echo "run pub uploader ..." + export DATA_PATH=$OUT_PATH + python3 -m pub_uploader.pub_uploader_main + # Run third module - pub_sync (using environment variables below as input parameters) + echo "run pub sync sites ..." + python3 -m pub_sync.pub_sync_main + EOF + env: + LOG_LEVEL: INFO + + # Environment variables for 1st module: pub_egress. The pub bucket and egress url are specified via secrets below. + OUT_PATH: "/pfs/out" + OUT_PATH_MDP: "/pfs/out/mdp" + OUT_MDP_SITES: "/tmp/mdp_sites.txt" + # ERR_PATH can be changed, it is user specified + ERR_PATH: /pfs/out/errored_datums + STARTING_PATH_INDEX: "2" # starting path index to process pub packages. Use "2" to process the whole repo with path structure /pfs/repo_name/... + PROD: "false" # false for non-prod, true for prod + STAGING: "true" # The default is true. + + # Environment variables for 2nd module: pub_upload. + # DATA_PATH is set in the code above to the output from the egress module + # Uses STARTING_PATH_INDEX above + VERSION: 'pachyderm_v1' + CHANGE_BY: pachyderm + + # Environment variables for 3rd module: pub_sync. + # Uses DATE_PATH from input spec. DATA_PATH is set in the code above to the output from the egress module + # Uses CHANGE_BY above + DATE_PATH_YEAR_INDEX: "3" + DATE_PATH_MONTH_INDEX: "4" + DATA_PATH_PRODUCT_INDEX: "3" + DATA_PATH_SITE_INDEX: "4" + DATA_PATH_DATE_INDEX: "5" + DATA_PATH_PACKAGE_INDEX: "6" + PRODUCTS: NEON.DOM.SITE.DP1.00044.002 # CAN BE MULTIPLE, COMMA-SEPARATED + SITES: "all" # CAN BE MULTIPLE, COMMA-SEPARATED array of NEON site codes. "all" will find all sites with pub records in the database. + + secrets: + - name: pdr-secret + mount_path: /var/db_secret + - name: pub-bucket + env_var: BUCKET_NAME + key: BUCKET_NAME + - name: pub-bucket + env_var: EGRESS_URL + key: EGRESS_URL + +input: + group: + - join: + - pfs: + name: DATA_PATH + repo: precipWeighingv2_pub_format_and_package + # Glob must be at each intended pub datum (i.e. each site/year/month), grouped by month + glob: /*/*/(*/*) + joinOn: $1 + group_by: $1 + - pfs: + name: DATE_PATH + repo: precipWeighingv2_cron_monthly_and_pub_control + glob: /(*/*) + joinOn: $1 + outer_join: True # We want to run even if no data so pub_sync runs + group_by: $1 + empty_files: true +autoscaling: true +resource_requests: + memory: 500M + cpu: .5 +resource_limits: + memory: 1G + cpu: 1.3 +sidecar_resource_requests: + memory: 3G + cpu: 1.3 +datum_set_spec: + number: 1 +scheduling_spec: + node_selector: + cloud.google.com/compute-class: pach-pipeline-class diff --git a/pipe/precipWeighingv2/precipWeighingv2_pub_format_and_package.yaml b/pipe/precipWeighingv2/precipWeighingv2_pub_format_and_package.yaml new file mode 100644 index 000000000..1d58b3df9 --- /dev/null +++ b/pipe/precipWeighingv2/precipWeighingv2_pub_format_and_package.yaml @@ -0,0 +1,123 @@ +--- +pipeline: + name: precipWeighingv2_pub_format_and_package +transform: + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pub-grp-pack:v4.2.0 + cmd: + - sh + - "-c" + - |- + /bin/bash <<'EOF' + # Use bash-scrict mode. See http://redsymbol.net/articles/unofficial-bash-strict-mode/ + set -euo pipefail + IFS=$'\n\t' + + # Refresh interim directories with each datum (otherwise they persist and cause probs) + rm -rf $OUT_PATH_TRANSFORMER + rm -rf $OUT_PATH_PACKAGER + mkdir $OUT_PATH_TRANSFORMER + mkdir $OUT_PATH_PACKAGER + + # Set some environment variables for the pub transformer module + export DATA_PATH=$GROUPED_PATH + export OUT_PATH=$OUT_PATH_TRANSFORMER + + # Run pub_workbook_loader to load pub workbooks for pub_transformer and os_table_loader. + python3 -m pub_workbook_loader.pub_workbook_loader_main + + # Run pub_transformer (using environment variables below as input parameters) + python3 -m pub_transformer.pub_transformer_main + + # Run pub_packager. Packager needs to be run at monthly glob. Get those paths. + export OUT_PATH=$OUT_PATH_PACKAGER + product_month_paths="${OUT_PATH_TRANSFORMER}/*/*/*" + for path in $product_month_paths; do + echo "Processing product-month path $path" + export DATA_PATH=$path + python3 -m pub_packager.pub_packager_main + done + + # Clean up after pub_transformer. + rm -rf $OUT_PATH_TRANSFORMER + + # Run pub_files. + export OUT_PATH=$OUT_PATH_PUBFILES + export IN_PATH=$OUT_PATH_PACKAGER + export LOCATION_PATH=$GROUPED_PATH + python3 -m pub_files.main + + EOF + env: + # Environment variables for 2nd (part A) module: pub_workbook_loader. + OUT_PATH_WORKBOOK: "/tmp/pub_workbooks" + PRODUCTS: NEON.DOM.SITE.DP1.00044.002 # Format: NEON.DOM.SITE.DPX.XXXXX.XXX,NEON.DOM.SITE.DPX.XXXXX.XXX,etc + + # Environment variables for 2nd module (part B): pub_transformer. + LOG_LEVEL: INFO + PRODUCT_INDEX: '3' # input path index of the data product identifier. Also shared with pub_packager. + YEAR_INDEX: '4' + MONTH_INDEX: '5' + DAY_INDEX: '7' + DATA_TYPE_INDEX: '8' + GROUP_METADATA_DIR: group + DATA_PATH_PARSE_INDEX: '2' + OUT_PATH_TRANSFORMER: "/tmp/pub_transformer" + WORKBOOK_PATH: "/tmp/pub_workbooks" + + # Environment variables for 3rd module: pub_packager. Also uses PRODUCT_INDEX from pub_transformer. + OUT_PATH_PACKAGER: "/tmp/pub_packager" + ERR_PATH_PACKAGER: "/pfs/out/packager/errored_datums" + PUBLOC_INDEX: '6' # input path index of the pub package location (typically the site) + DATE_INDEX: '4' # Starting index of date in path (i.e. year index) + DATE_INDEX_LENGTH: '2' # length of date index for pub package (should be 2 for monthly) + SORT_INDEX: '10' # File name index corresponding to date field (delimiter = .) + + # Environment variables for 3rd module: pub_files. + OUT_PATH_PUBFILES: "/pfs/out" + RELATIVE_PATH_INDEX: '3' + DB_SECRETS_PATH: /var/db_secret + GITHUB_PEM_PATH: /var/github_secret/key + GITHUB_APP_ID: '300002' + GITHUB_INSTALLATION_ID: '34765458' + GITHUB_HOST: https://api.github.com + GITHUB_REPO_OWNER: NEONScience + GITHUB_README_REPO: neon-metadata-docs + GITHUB_README_PATH: readme/template.j2 + GITHUB_EML_REPO: neon-metadata-docs + GITHUB_EML_BOILERPLATE_PATH: eml/neon_components/NEON_EML_Boilerplate.xml + GITHUB_EML_CONTACT_PATH: eml/neon_components/neon_contact.xml + GITHUB_EML_INTELLECTUAL_RIGHTS_PATH: eml/neon_components/neon_intellectualRights.xml + GITHUB_EML_UNIT_TYPES_PATH: eml/neon_components/neon_unitTypes.xml + GITHUB_EML_UNITS_PATH: eml/neon_components/NEON_units.txt + GITHUB_BRANCH: main + + secrets: + - name: pdr-secret + mount_path: /var/db_secret + - name: github-neonscience-app-secret + mount_path: /var/github_secret + +input: + pfs: + name: GROUPED_PATH + repo: precipWeighingv2_pub_group + # Glob must be product-monthly or product-site-monthly. Product-site-month datums reduce unneccesary republication. + # path structure is e.g. DP1.00098.001/2023/04/CPER/04 (product/year/month/site/day) + glob: /*/*/*/* +parallelism_spec: + constant: 5 +autoscaling: true +resource_requests: + memory: 400M + cpu: 1.2 +resource_limits: + memory: 800M + cpu: 1.2 +sidecar_resource_requests: + memory: 2G + cpu: 0.4 +datum_set_spec: + number: 5 +scheduling_spec: + node_selector: + cloud.google.com/compute-class: pach-pipeline-class \ No newline at end of file diff --git a/pipe/precipWeighingv2/precipWeighingv2_pub_group.yaml b/pipe/precipWeighingv2/precipWeighingv2_pub_group.yaml new file mode 100644 index 000000000..bfaf51178 --- /dev/null +++ b/pipe/precipWeighingv2/precipWeighingv2_pub_group.yaml @@ -0,0 +1,53 @@ +--- +pipeline: + name: precipWeighingv2_pub_group +transform: +# image_pull_secrets: [battelleecology-quay-read-all-pull-secret] + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pub-grp-pack:v4.2.0 + cmd: ["/bin/bash"] + stdin: + - "#!/bin/bash" + - '# Run first module - pub_grouper (using environment variables below as input parameters)' + - python3 -m pub_grouper.pub_grouper_main + env: + # Environment variables for 1st module: pub_grouper. + LOG_LEVEL: INFO + OUT_PATH: "/pfs/out" + ERR_PATH_GROUPER: "pfs/out/errored_datums" + YEAR_INDEX: '3' + GROUP_INDEX: '6' + DATA_TYPE_INDEX: '7' # Also shared with pub_transform + GROUP_METADATA_DIR: group + PUBLOC_KEY: site + LINK_TYPE: SYMLINK + +input: + join: + - pfs: + name: DATA_PATH + repo: precipWeighingv2_level1_group_consolidate_srf + # Glob should be monthly and joined with pub_control to hold pub until month is likely complete + glob: /(*/*) + joinOn: $1 + - pfs: + repo: precipWeighingv2_cron_monthly_and_pub_control + glob: /(*/*) + joinOn: $1 + empty_files: true +parallelism_spec: + constant: 2 +autoscaling: true +resource_requests: + memory: 1.8G + cpu: 1 +resource_limits: + memory: 2.5G + cpu: 1.5 +sidecar_resource_requests: + memory: 4G + cpu: 1 +datum_set_spec: + number: 1 +scheduling_spec: + node_selector: + cloud.google.com/compute-class: pach-pipeline-class \ No newline at end of file From 2833cdb172f0d749a30bfe9a57765bddc37b5e2c Mon Sep 17 00:00:00 2001 From: burlingamet Date: Fri, 8 May 2026 11:11:24 -0600 Subject: [PATCH 11/21] changing flag logic to flag for cold temps rather than use heater status codes. --- .../wrap.precip.pluvio.flags.R | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R index 4344afe3d..38ff413de 100644 --- a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R +++ b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R @@ -5,7 +5,7 @@ #' Teresa Burlingame \email{tburlingame@battelleecology.org} \cr #' @description Workflow. Compute the heater and status flags by assessing the bit rate. Only -#' flagging alarm codes of interest. +#' flagging alarm codes of interest. Flag heater error if inlet temp is below freezing in near freezing conditions #' #' @param DirIn Character value. The input path to the data from a single source ID, structured as follows: #' #/pfs/BASE_REPO/#/yyyy/mm/dd/#/location-id, where # indicates any number of parent and child directories @@ -69,6 +69,8 @@ # Initial creation # Teresa Burlingame (2026-04-08) # removing unnecessary block and adding in schema to report write +# Teresa Burlingame (2026-05-08) +# changing test so that it looks at inlet temp rather than status ############################################################################################## wrap.precip.pluvio.flags<- function(DirIn, DirOutBase, @@ -162,23 +164,21 @@ wrap.precip.pluvio.flags<- function(DirIn, } } - #heater status for bit vals of interest - for (i in seq_along(data$heater_status)) { - if (is.na(data$heater_status[i])) { - qfPlau$heaterErrorQF[i] <- -1 - } else { - if (data$heater_status[i] == 0) { - qfPlau$heaterErrorQF[i] <- 0 - } - if ((data$heater_status[i] / 2^5) %% 2 >= 1) { #functional check failed - qfPlau$heaterErrorQF[i] <- 1 - } - if ((data$heater_status[i] / 2^7) %% 2 >= 1) { #heater deactivated or not present - qfPlau$heaterErrorQF[i] <- 1 - } - } - } - + # If inlet temperature indicates that sensor is not adequately heating + + for (i in seq_along(data$inletTemp)) { + if (is.na(data$inletTemp[i])) { + qfPlau$heaterErrorQF[i] <- -1 + } else { + if (data$inletTemp[i] > 0 ) { + qfPlau$heaterErrorQF[i] <- 0 + } + if (data$inletTemp[i] < 0 & data$cell_temperature > -10 ) { #see if there's a threshold where it stops trying to heat? + qfPlau$heaterErrorQF[i] <- 1 + } + } + } + nameFileQfOutFlag <- fileQfPlau nameFileQfOutFlag <- fs::path(dirOutQf,nameFileQfOutFlag) From 172a7d4f5e7beab62370b20a96bbb5ae13da60af Mon Sep 17 00:00:00 2001 From: Teresa Burlingame Date: Fri, 8 May 2026 10:35:23 -0700 Subject: [PATCH 12/21] updating precip flagging and adding unit tests. --- .../pluvio_CFGLOC105245_2025-03-31.parquet | Bin 0 -> 3013 bytes ...05245_2025-03-31_flagsPlausibility.parquet | Bin 0 -> 1939 bytes .../pluvio_CFGLOC105245_2025-03-31.parquet | Bin 0 -> 3311 bytes ...o_CFGLOC105245_2025-03-31_flagsCal.parquet | Bin 0 -> 1509 bytes ...05245_2025-03-31_flagsPlausibility.parquet | Bin 0 -> 2834 bytes ...GLOC105245_2025-03-31_uncertaintyCoef.json | 8 + .../testthat/test-wrap-precip-pluvio-flags.R | 181 +++++++++++++++++ .../testthat/test-wrap-precip-pluvio-stats.R | 185 ++++++++++++++++++ 8 files changed, 374 insertions(+) create mode 100644 flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet create mode 100644 flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet create mode 100644 flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet create mode 100644 flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsCal.parquet create mode 100644 flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet create mode 100644 flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/uncertainty_coef/pluvio_CFGLOC105245_2025-03-31_uncertaintyCoef.json create mode 100644 flow/tests/testthat/test-wrap-precip-pluvio-flags.R create mode 100644 flow/tests/testthat/test-wrap-precip-pluvio-stats.R diff --git a/flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet b/flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet new file mode 100644 index 0000000000000000000000000000000000000000..3e63df5a39e96b144607086be1a672feedbee39a GIT binary patch literal 3013 zcma)8O>7fK6rS1GjuUL^P|wP)+{#6;az(&DBs3tZWE_H<-~`7cu@hB_%z8-zP8|G` z0GH-aAr2fkR28kFibEwX9ID=0^$=B6ML8g(UV1Q9fs_-7#sMp0-;RzFQObn!jhlm58 zpw=NE(k2i?MBCDq|U zz1Op8qBSrXU_vu@9dZisPXj1s)OQwdLiLQ$#MllXLkZceZG^@@(hSlLSlvNNOa~$j z>$x+DmivU?vfLdqIsC}cR^}G6f2Xx-CBiQ4;7tbxicd!`AS$g?H@&b+8(v@peY{KI zhro;Aqd^vMRrO_KwID#?IjRJDs^xk`Hx^flM_C#@BWZVarDm5o;HIZy6pH0~ZLzkp zY8>^a@AifIDH5U=PTd7vuP?4uYD_42xoWIc%au#DLakmkMP5@hR;iRL)8`p!^{lLw zjM|K`x^C`$x=|`E)?jZ`K(k_)V(+q1s2P>TstKq1gQ4KI#zMjq9AqwE(B_3uRKNNI zSAXK#gy!DQ%MSs#O71Wp?yt9*7K_QASxcW8MwiN9NM-uDlvA;pP zf$#D*U`NCHw#X9tABy@!(e634mr4jSz5kbG7v~3dPkI}K&14bTZAJZ?Yqwq6cTOau zW`8wWC2W^MfxVebvAnaF0uesu>NBpr>ejyL4#C&G_bbwoK!NX_@?sIR7Zx#m&ed(M zZTD!e+{gfJk2j?wLo@SMyjo>S=+C(NJJ+7|YWI4Z0o&KR(vhv{kHxm@BOuZrx%xBL z?t8SC2U?{2d1yM4?sBpH6C=hQu0H14bFcQu+hnvqF|Fs*F5Rmx$W8V6LDjFh)xQq1 zU?|{`+3dyK8S_C3Ja9#^#pS4!avD0e3sP>%IWojCZVF~+Fu#guvVMq?(GJ^wACC8&1(I{+^@&*Zs2^AY{3g<#D6 zyO}>Z6`Rjz<2piR(#N9FAy|ydE13_YQIt(jlKm5~Y`G%Q8n9=obUAh73gFaqz#^R0 zSv-*!zy{J^8jZbXkB;XydkDIa8>}wmv$G4iII_xIQ~%txrv5UXK=7d1^vNGRF;~|U zr*CHS6E`oPoXBSJ*5p+%0{xLmGpM&3`>|27-%J86@e<|-uHIV64c-EbnPTdEteB_t zV18h+B$JzFe(3ro9FsbLzA4^BJ)fJpTAVvw*Vizd^N5$i*=*zTS_VKU-ern66bm$3 z(&`+JA5WC#bJ>kzJ_|7~)fUWSz^A%`nL+~>#~blM+mmy1W0Nw`#GfTCAcxk%2fqM+ zY8`TUrI4GJH}F2x4-_ll&ZFNH2nd#?luTv`gh*}qpnO6u;`-`TDW51|5u-geRVi5@ xyk4kWuN$@F`uh6u0{oJ0hBu5#b)~!(P7Iugorndu;WvD*Kh_1Wdj|f2{~t7_6u1BY literal 0 HcmV?d00001 diff --git a/flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet b/flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet new file mode 100644 index 0000000000000000000000000000000000000000..1d89699386ea22b48da5eaa5c37ccc98e3909fc0 GIT binary patch literal 1939 zcmb_d%}*0i5P!S2%Sw7ExNozYtVvB^gRxMkK!D)kZA&WwKd4Xw7qgV6#q^7QP(w`g z$iX8A4u*KrD}R6o&xUaDva{GG}dD2E-thsAOm|aZf;V(N!{ZhPSVe5E7gfK`MgJAB-;V9Doi*$~3S@ zMP8yW04_|@PgPG6A9^(|0o-G8(90nQR8SBtNR>s{uwmK2H~}6&RH)`fL~k3hZE?ij z0TH_MIs z!}?Y+JE$x^?=!-fhfontrBb6%SV}lXPt`8vvMtM>vnwsVcgHSlZN}`=R|B<1t&+{u z9ha);hF&mRw}BB`bkvZiRO`D%71t1^f2BADk3pK*k-Rwx=9ph0P~4% zKDQ5^Xs#}bU?SS-hV7QAo=wHRabxP#r=!CFIC z4H3psTWEr?xaxqSj88>&A`v`bd5{PW3lqDG<7aNBcQeWKqs_Uy`He&($%*B!W!SD(%gxYEwp!aNmqO9;k1QM$G-qp CaX}XV literal 0 HcmV?d00001 diff --git a/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet b/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet new file mode 100644 index 0000000000000000000000000000000000000000..e152f670af44aafc29c3e96afec991da924bf8c4 GIT binary patch literal 3311 zcmb_fZERCj7``oA*Rf8z(Rf(V2?$dWQJ0{J8VyQ(&S~#<$HrWuC+l<0 zdCxh|^Pcy8dz!h3yOhu17xSW%SMyvx$K_S#avZn5wRL$Um>B2_+pJce*@lnutI)4V(|WEZG`NtG(eK|tUimIZQ(v#`)o*IHw>md-$(m0mK+gBj>X zv-q2tPB=t89-CLfjgFi4mTEZ8G5koTR(oOLB4FFQg}`eS#lXOYrNAHk7T}iOY`{Nu zlmnkLR{%9fo&fHuB0$6CYTz52YJrRIH2~j#Wj)YZ)B-&Ikq7vsvjceU#s;7><_BKV zNWhMLy}+-R1%bV%!@xVOG2p=M0pQ=;hJf1_JPXWu`+4B2gfoBf!wHqrhJs$AR0fo&c)D zr+_c2&j9ti&jI%=`3(5z$Dacay1vNN>Pl{2+Pmn0_UPbcm@0Cv0#A=z2iDrY0bV@! zU8Z)Gr~L;O-mk{TX^*T$`1oW&7`0q_1*3E-1g>}nE6e4oYpK%EHXkv@TXIel8f=sjpX4ws2WsK0bl4uM-#7K0~6ZV z+EkBqjxXekr#P100mo8pm^n7h${fqDPSW^RV5po7gVH*KzcVR}K2TwFLSa;) zps>zOVTKsX3FKr2PLWDPUtS-cu3$P;#u+uSK(7??^Z6XFD^eS?W8p+JAa(Y26>&}) zIcCj@^~II84D8fJC4W~q5$}xmg`^^bb7!8pgaw+Pk5#Qd5J+_PN8=o?rki6@e=Hnr ziTmS;Sjv=}GSx(*;b`+Zj-ATx>kmrtRw)!o&Gi8(80?H=E=6%NDy7W1y^=pJMLT0D zIw5H?oA!&Gnb(=hIE~(u20ob~9|2zzxVjY;H^ zxSdi&4hoaltrA95;;>pguc9LIBdvU`rxSmwnL?bA_I^ra8T*B7;rA?YTr1vIn~~AE zzbex-mnd&CKP@EpwR}5eDKNp;Rlddpg8FLeuT38B&9S@0Px&K8$j@M+FGWL@y;jBtLv_Smpp^1PgUk}AMzETdO zq4Hk?CaX>R5-3D}){CS1Nm0dvq7YSB=PWw)E2dC9mM?D4pHNgjFq4W(>#T~x6@!oz zwZiB^&SW;~h}*s1TakK@j0ZFp+Db_kA*2}}+8PKMB=#P%j*uQw)x&IdJ0Xp9gpGBy z$6W3)+l)_jj55SX4Iz$ZEMtgyIR>O*5MWn8=3$07*1Y=P;cN*Q*fgr*$tj} z09=eG)J9iG1AJiTd#bGW`4gb)3V#^2-&+>z@VVPMyroo?cqh(RZ=N{cOIHv)G{t## zpMcGi2-sF{b^GeKZd_6CcGF#ro8Sb!53#3$I+UK*R5AZkNnk^|iRK4ehdR7vL*Qub za;&p<`B-{relX}E_Q6zs*!C8R_5es{i#L;Q_cm?n@~ln-`e``ZDW8M(sa8ieyFkL? z9cJ-Hv7oUb8&y-Rtq-<)-2+`dH)0-)cchlV&*}<23k^2brQ<_=w|hJ_jl?#=-^NZs z9Bq;x_X2-Y1i9??dz(p;?nnKgSi##)@67^%V9AHXA9CI zrv50762ch^BL3)>gcM&Eh(wn8@k=^bFd#)^ec}EB+sftEFuoz*gDVUUbq`kY^*HRwp*o^x_NTjq{BqpuBNtF^e)MoO0vWx zZ7!9$4IC&`#Z6Q?DZb##rW8Rz8Jnok4~o<=KPdP%oqI3HV3X~`WdCz-#7Zq}huq&e z=Q-#6|Ic|2q?NVTP$#`mOdl2nsY%IfLcW{fB^M^teKjH>fe&9UlBDsM=7Ot_%?EEc zRDzN5SHWM6#o)$2Yr(&^E&)H#8bI;XGH|=jz&SUXz|S_cfb*WL06*Wg3aqYp8$5e1 z489TU2FLF8fWfQ*UKe9v_i!A%UzGsk7gOM)jx0Fv^8onIra^Gatj%EQ;g7&imV6AV zSGIx&dWOISzwZP`w|olLDSN@MP7H(kni9MN)mZ>ldrMcP1uyPYF%aoEP;kqaqbGMIwb2j0#U7lgbxF%gYHN^|uAk zbcp54-L57>YdPc%e&5(SKhzQHS#vbG_NVvPSMG@Rjl{ngP5gE~?Y)*u-ZMuZ;S8Q$ zqOKqntcQ2H96UzoY${Jm#>u0KMI-&&yFkHs$dW`EjvlNc2em?-^7tiV0JG<1%!O3q{pKOk~FW zgo59s1TDAS4tc-Bb5!y?Dc0-~PgCgsCjZ=ZCmCES@eG&1fEQ|-+D~sFKQ5I`k0g)I zB&w#kSbO`b&IbEclnG%mZ~0yyW2_AyZz~wfGhZ+BGuF%WUMuFqh7cd=jJ?Hs%%Ap{ z@g?0lZ;Np1Auergje}eajT;r3r5AZEJQxXtH$2L8a?z>z zCd2XWz|xwie`&SRS(D&}H-nH?{K5YVzKT^; literal 0 HcmV?d00001 diff --git a/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet b/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f95b5e44ae5503488765451ccabc91869351bbc8 GIT binary patch literal 2834 zcmdUxdu&rx9LMkK+I3rD9desCsiqiq9igLLb+|3~IDM^yGS;zmtZ~7+uC{}|=zAl+ z5=_txjE?}BWI+w{nT3cELzDo?{*ahq;17%-pyC5vBqnOqMB?|H+m2RdMBP8!w4Zx^ zzjMy-c~06+($&b9^G7TAOL8}#|#Jx)eOu zS`YfCmxJGlCUDEo7Vy{YE5XO~t)Sw-z2FWT0T-XKgD-A!fp=V81HS(3I?!CV9z67( z7rf8y1E((ZgYKjVo>l}v-&hFzydeyRj>W($-AOR>Z3g^f>j=25=25Wvl_$XGRz3;p zPHYGF^^byg{`d?yx$SwdNxK_-|IIPbcKH_pe%L-d`fm4RPUMscQ5-8y#0fZ!2O*c%atnQrPF(s>{A}ho`I^h`Yd>C z{5aXS%yh5*4a`_pI@~zNt^>b)zYdMZ< zJ|_cnWs2Gd3@W>*cfD+Mf4Qo2>Dul<|AvXkgWo*zQ2oxqq4Cfgli}}=#EmCYkqf!W zE7*h9D|wZhm$Al|tDvI6EaEjq!IV?2mJvY|Nz8erL2_P}yQ!>|&6z3HYRf5Urq`sK zIa4I_zj~_M+r()VNq;C15qUM5Tqo125&>~AmQHO<4MhTVi`}EOdL#A2@iNRRqUmth z?BoeViVc&vKKuiT@ zb()+`uNxIOJzuVC;uLCKX>`0ylhaIBYJRB@Miqkc-`y>p5jO;R&;qLQTD44}6Sy1_ zrT%1jfBAfwrx(gJd&`6grEs-EFWs5Hxj8pxx=_6$+s7+~vC8?dE%lbH8+O@jwlHFKQO3q1UT;pv6*;-TaW7kQYj z5hiN>s|U@ewVIru)EriGI=xm!y1Lf&wn{fwdr@wp&oHqOLOl4=XAL1)Vhs`}Awgmb zGM&{*$XYtW!gl(GTzEsGi)@>zgPT{v%6n^bRFl sensorErrorQF=0, heaterErrorQF=0 +#' Row 2: sensorStatus=NA, inletTemp=NA -> sensorErrorQF=-1, heaterErrorQF=-1 +#' Row 3: sensorStatus=64, inletTemp=-2.0 -> sensorErrorQF=1, heaterErrorQF=1 +#' Row 4: sensorStatus=128, inletTemp=5.0 -> sensorErrorQF=1, heaterErrorQF=0 +#' Row 5: sensorStatus=256, inletTemp=5.0 -> sensorErrorQF=1, heaterErrorQF=0 +#' Row 6: sensorStatus=512, inletTemp=5.0 -> sensorErrorQF=1, heaterErrorQF=0 +#' Row 7: sensorStatus=1024, inletTemp=5.0 -> sensorErrorQF=1, heaterErrorQF=0 +#' Row 8: sensorStatus=0, inletTemp=-2.0 -> sensorErrorQF=0, heaterErrorQF=1 +#' Row 9: sensorStatus=0, inletTemp=5.0 -> sensorErrorQF=0, heaterErrorQF=0 +#' Row 10: sensorStatus=0, inletTemp=5.0 -> sensorErrorQF=0, heaterErrorQF=0 + +# changelog and author contributions / copyrights +# Teresa Burlingame (2026-05-08) +# Initial creation +############################################################################################## +library(testthat) + +context("\n | Unit test of wrap.precip.pluvio.flags for NEON IS data processing \n") + +test_that("Unit test of wrap.precip.pluvio.flags.R", { + source('../../flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R') + + DirFixture <- file.path(getwd(), 'pfs/precipPluvioFlags') + DirOutBase <- tempfile(pattern = 'precip_pluvio_flags_test_out') + + # Clean up temp output after test completes + on.exit(if (dir.exists(DirOutBase)) unlink(DirOutBase, recursive = TRUE), add = TRUE) + + DirIn <- file.path(DirFixture, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245') + + # -------------------------------------------------------------------------------------------- + # Test 1: Happy path - output flags directory is created + # -------------------------------------------------------------------------------------------- + wrap.precip.pluvio.flags( + DirIn = DirIn, + DirOutBase = DirOutBase + ) + + DirOut <- file.path(DirOutBase, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245') + testthat::expect_true(dir.exists(file.path(DirOut, 'flags'))) + + # -------------------------------------------------------------------------------------------- + # Test 2: Output flagsPlausibility file is written + # -------------------------------------------------------------------------------------------- + out_flags <- list.files(file.path(DirOut, 'flags'), pattern = 'flagsPlausibility\\.parquet$', + full.names = TRUE) + testthat::expect_equal(length(out_flags), 1) + + # -------------------------------------------------------------------------------------------- + # Test 3: Output file contains heaterErrorQF and sensorErrorQF columns + # -------------------------------------------------------------------------------------------- + qfOut <- NEONprocIS.base::def.read.parq.ds( + fileIn = out_flags, + VarTime = 'readout_time', + RmvDupl = TRUE, + Df = TRUE + ) + + testthat::expect_true('heaterErrorQF' %in% names(qfOut)) + testthat::expect_true('sensorErrorQF' %in% names(qfOut)) + + # -------------------------------------------------------------------------------------------- + # Test 4: sensorErrorQF is -1 when sensorStatus is NA (row 2) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[2], -1L) + + # -------------------------------------------------------------------------------------------- + # Test 5: sensorErrorQF is 0 when sensorStatus is 0 (row 1) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[1], 0L) + + # -------------------------------------------------------------------------------------------- + # Test 6: sensorErrorQF is 1 when bit 6 (unstable) is set (row 3, sensorStatus=64) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[3], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 7: sensorErrorQF is 1 when bit 7 (defective) is set (row 4, sensorStatus=128) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[4], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 8: sensorErrorQF is 1 when bit 8 (weight < min) is set (row 5, sensorStatus=256) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[5], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 9: sensorErrorQF is 1 when bit 9 (weight > max) is set (row 6, sensorStatus=512) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[6], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 10: sensorErrorQF is 1 when bit 10 (no calibration) is set (row 7, sensorStatus=1024) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[7], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 11: heaterErrorQF is -1 when inletTemp is NA (row 2) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$heaterErrorQF[2], -1L) + + # -------------------------------------------------------------------------------------------- + # Test 12: heaterErrorQF is 0 when inletTemp > 0 (row 1, inletTemp=5.0) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$heaterErrorQF[1], 0L) + + # -------------------------------------------------------------------------------------------- + # Test 13: heaterErrorQF is 1 when inletTemp < 0 and cell is near freezing (row 3, inletTemp=-2) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$heaterErrorQF[3], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 14: heaterErrorQF is 1 when sensorStatus=0 but inletTemp<0 (row 8, inletTemp=-2) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$heaterErrorQF[8], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 15: Original flag columns are preserved in output (nullQF, rangeQF, gapQF) + # -------------------------------------------------------------------------------------------- + testthat::expect_true('nullQF' %in% names(qfOut)) + testthat::expect_true('rangeQF' %in% names(qfOut)) + testthat::expect_true('gapQF' %in% names(qfOut)) + + # -------------------------------------------------------------------------------------------- + # Test 16: All flag values are in the expected set {-1, 0, 1} + # -------------------------------------------------------------------------------------------- + testthat::expect_true(all(qfOut$sensorErrorQF %in% c(-1L, 0L, 1L))) + testthat::expect_true(all(qfOut$heaterErrorQF %in% c(-1L, 0L, 1L))) + + # -------------------------------------------------------------------------------------------- + # Test 17: DirSubCopy carries additional directories through to output + # -------------------------------------------------------------------------------------------- + DirOutBase2 <- tempfile(pattern = 'precip_pluvio_flags_test_out2') + on.exit(if (dir.exists(DirOutBase2)) unlink(DirOutBase2, recursive = TRUE), add = TRUE) + + wrap.precip.pluvio.flags( + DirIn = DirIn, + DirOutBase = DirOutBase2, + DirSubCopy = 'data' + ) + + DirOut2 <- file.path(DirOutBase2, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245') + testthat::expect_true(dir.exists(file.path(DirOut2, 'flags'))) + testthat::expect_true(dir.exists(file.path(DirOut2, 'data'))) + + # -------------------------------------------------------------------------------------------- + # Test 18: Invalid input directory produces an error + # -------------------------------------------------------------------------------------------- + DirIn_bad <- file.path(DirFixture, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC_DOES_NOT_EXIST') + + rpt <- try( + wrap.precip.pluvio.flags( + DirIn = DirIn_bad, + DirOutBase = DirOutBase + ), + silent = TRUE + ) + + testthat::expect_true('try-error' %in% class(rpt)) +}) diff --git a/flow/tests/testthat/test-wrap-precip-pluvio-stats.R b/flow/tests/testthat/test-wrap-precip-pluvio-stats.R new file mode 100644 index 000000000..e9f636582 --- /dev/null +++ b/flow/tests/testthat/test-wrap-precip-pluvio-stats.R @@ -0,0 +1,185 @@ +############################################################################################## +#' @title Unit tests for wrap.precip.pluvio.stats + +#' @author +#' Teresa Burlingame \email{tburlingame@battelleecology.org} + +#' @description +#' Run unit tests for wrap.precip.pluvio.stats.R. +#' The tests include positive and negative scenarios. +#' Positive tests verify that 1-minute and 30-minute output files are created with the +#' expected columns, that uncertainty is applied correctly, that 30-minute aggregation +#' sums precipitation across 1-minute intervals, and that finalQF reflects the component flags. +#' The negative test verifies that an invalid input directory produces an error. +#' +#' Fixture data expected at: +#' pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/ +#' data/ - pluvio_CFGLOC105245_2025-03-31.parquet (60 rows, 1 hr) +#' flags/ - pluvio_CFGLOC105245_2025-03-31_flagsCal.parquet +#' pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet +#' uncertainty_coef/ - pluvio_CFGLOC105245_2025-03-31_uncertaintyCoef.json +#' (U_CVALA1 = 0.02) + +# changelog and author contributions / copyrights +# Teresa Burlingame (2026-05-08) +# Initial creation +############################################################################################## +library(testthat) + +context("\n | Unit test of wrap.precip.pluvio.stats for NEON IS data processing \n") + +test_that("Unit test of wrap.precip.pluvio.stats.R", { + source('../../flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R') + + DirFixture <- file.path(getwd(), 'pfs/precipPluvioStats') + DirOutBase <- tempfile(pattern = 'precip_pluvio_stats_test_out') + + # Clean up temp output after test completes + on.exit(if (dir.exists(DirOutBase)) unlink(DirOutBase, recursive = TRUE), add = TRUE) + + DirIn <- file.path(DirFixture, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245') + + # -------------------------------------------------------------------------------------------- + # Test 1: Happy path - output stats directory is created + # -------------------------------------------------------------------------------------------- + wrap.precip.pluvio.stats( + DirIn = DirIn, + DirOutBase = DirOutBase, + DirSubCopy = 'location' + ) + + DirOut <- file.path(DirOutBase, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245') + testthat::expect_true(dir.exists(file.path(DirOut, 'stats'))) + + # -------------------------------------------------------------------------------------------- + # Test 2: Both 1-min and 30-min output files are written + # -------------------------------------------------------------------------------------------- + stats_files <- list.files( + file.path(DirOut, 'stats'), + pattern = '_stats_(001|030)\\.parquet$', + full.names = TRUE + ) + + testthat::expect_equal(length(stats_files), 2) + + file_001 <- stats_files[grepl('_stats_001\\.parquet$', stats_files)] + file_030 <- stats_files[grepl('_stats_030\\.parquet$', stats_files)] + testthat::expect_equal(length(file_001), 1) + testthat::expect_equal(length(file_030), 1) + + # -------------------------------------------------------------------------------------------- + # Test 3: 1-min output schema contains all expected columns + # -------------------------------------------------------------------------------------------- + stats_001 <- NEONprocIS.base::def.read.parq.ds( + fileIn = file_001, + VarTime = 'startDateTime', + RmvDupl = TRUE, + Df = TRUE + ) + + expected_cols <- c( + 'startDateTime', 'endDateTime', + 'precipBulk', 'precipBulkExpUncert', 'precipNumPts', + 'nullQF', 'gapQF', 'extremePrecipQF', + 'heaterErrorQF', 'sensorErrorQF', + 'validCalQF', 'suspectCalQF', 'finalQF' + ) + + testthat::expect_true(all(expected_cols %in% names(stats_001))) + + # -------------------------------------------------------------------------------------------- + # Test 4: 30-min output schema contains all expected columns + # -------------------------------------------------------------------------------------------- + stats_030 <- NEONprocIS.base::def.read.parq.ds( + fileIn = file_030, + VarTime = 'startDateTime', + RmvDupl = TRUE, + Df = TRUE + ) + + testthat::expect_true(all(expected_cols %in% names(stats_030))) + + # -------------------------------------------------------------------------------------------- + # Test 5: 30-min output has fewer rows than 1-min output + # -------------------------------------------------------------------------------------------- + testthat::expect_lt(nrow(stats_030), nrow(stats_001)) + + # -------------------------------------------------------------------------------------------- + # Test 6: precipBulk values are non-negative + # -------------------------------------------------------------------------------------------- + testthat::expect_true(all(stats_001$precipBulk >= 0, na.rm = TRUE)) + testthat::expect_true(all(stats_030$precipBulk >= 0, na.rm = TRUE)) + + # -------------------------------------------------------------------------------------------- + # Test 7: precipBulkExpUncert meets the minimum 0.1 mm manufacturer accuracy spec + # -------------------------------------------------------------------------------------------- + testthat::expect_true(all(stats_001$precipBulkExpUncert >= 0.1, na.rm = TRUE)) + + # -------------------------------------------------------------------------------------------- + # Test 8: 30-min precipBulk is the sum of the corresponding 1-min precipBulk values + # -------------------------------------------------------------------------------------------- + # First 30-min interval covers rows 1-30 of 1-min stats + first_30_sum <- sum(stats_001$precipBulk[1:30], na.rm = TRUE) + testthat::expect_equal(stats_030$precipBulk[1], first_30_sum, tolerance = 1e-8) + + # -------------------------------------------------------------------------------------------- + # Test 9: finalQF is 0 when all component flags are 0 (fixture has no errors) + # -------------------------------------------------------------------------------------------- + testthat::expect_true(all(stats_001$finalQF == 0L, na.rm = TRUE)) + testthat::expect_true(all(stats_030$finalQF == 0L, na.rm = TRUE)) + + # -------------------------------------------------------------------------------------------- + # Test 10: Quality flag values are within the expected set {-1, 0, 1} + # -------------------------------------------------------------------------------------------- + qf_cols <- c('nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF', + 'sensorErrorQF', 'finalQF') + + for (col in qf_cols) { + testthat::expect_true( + all(stats_001[[col]] %in% c(-1L, 0L, 1L), na.rm = TRUE), + info = paste0('Column ', col, ' in 1-min output has unexpected values') + ) + testthat::expect_true( + all(stats_030[[col]] %in% c(-1L, 0L, 1L), na.rm = TRUE), + info = paste0('Column ', col, ' in 30-min output has unexpected values') + ) + } + + # -------------------------------------------------------------------------------------------- + # Test 11: precipNumPts equals 1 for each non-NA 1-min row and sums to 30 per 30-min interval + # -------------------------------------------------------------------------------------------- + testthat::expect_true(all(stats_001$precipNumPts %in% c(0L, 1L))) + testthat::expect_equal(stats_030$precipNumPts[1], 30L) + + # -------------------------------------------------------------------------------------------- + # Test 12: DirSubCopy carries additional directories through to output + # -------------------------------------------------------------------------------------------- + DirOutBase2 <- tempfile(pattern = 'precip_pluvio_stats_test_out2') + on.exit(if (dir.exists(DirOutBase2)) unlink(DirOutBase2, recursive = TRUE), add = TRUE) + + wrap.precip.pluvio.stats( + DirIn = DirIn, + DirOutBase = DirOutBase2, + DirSubCopy = c('flags', 'uncertainty_coef') + ) + + DirOut2 <- file.path(DirOutBase2, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245') + testthat::expect_true(dir.exists(file.path(DirOut2, 'stats'))) + testthat::expect_true(dir.exists(file.path(DirOut2, 'flags'))) + testthat::expect_true(dir.exists(file.path(DirOut2, 'uncertainty_coef'))) + + # -------------------------------------------------------------------------------------------- + # Test 13: Invalid input directory produces an error + # -------------------------------------------------------------------------------------------- + DirIn_bad <- file.path(DirFixture, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC_DOES_NOT_EXIST') + + rpt <- try( + wrap.precip.pluvio.stats( + DirIn = DirIn_bad, + DirOutBase = DirOutBase + ), + silent = TRUE + ) + + testthat::expect_true('try-error' %in% class(rpt)) +}) From 5ad3b3bbf5d53c108fce77b46d1c354f2f0328d0 Mon Sep 17 00:00:00 2001 From: Teresa Burlingame Date: Fri, 8 May 2026 10:40:44 -0700 Subject: [PATCH 13/21] strip stale schema if schemaQF is null --- flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R index 38ff413de..d0fc94ca3 100644 --- a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R +++ b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R @@ -183,6 +183,12 @@ wrap.precip.pluvio.flags<- function(DirIn, nameFileQfOutFlag <- fs::path(dirOutQf,nameFileQfOutFlag) + # Strip inherited parquet schema attribute so def.wrte.parq does not error + # on a column count mismatch after heaterErrorQF and sensorErrorQF were added. + if(base::is.null(SchmQf)){ + attr(qfPlau, 'schema') <- NULL + } + rptWrte <- base::try(NEONprocIS.base::def.wrte.parq( data = qfPlau, From 0c5951a16aefd276d434693dcda1b4fb76007ec5 Mon Sep 17 00:00:00 2001 From: Teresa Burlingame Date: Fri, 8 May 2026 10:42:29 -0700 Subject: [PATCH 14/21] didn't indext --- flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R index d0fc94ca3..58315df3a 100644 --- a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R +++ b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R @@ -173,7 +173,7 @@ wrap.precip.pluvio.flags<- function(DirIn, if (data$inletTemp[i] > 0 ) { qfPlau$heaterErrorQF[i] <- 0 } - if (data$inletTemp[i] < 0 & data$cell_temperature > -10 ) { #see if there's a threshold where it stops trying to heat? + if (data$inletTemp[i] < 0 & data$cell_temperature[i] > -10 ) { #see if there's a threshold where it stops trying to heat? qfPlau$heaterErrorQF[i] <- 1 } } From 7cf1c96e5071c804baf3c48a31d1ae178c522d1c Mon Sep 17 00:00:00 2001 From: Teresa Burlingame Date: Fri, 8 May 2026 10:52:45 -0700 Subject: [PATCH 15/21] data fix --- .../pluvio_CFGLOC105245_2025-03-31.parquet | Bin 3013 -> 2694 bytes .../pluvio_CFGLOC105245_2025-03-31.parquet | Bin 3311 -> 2993 bytes 2 files changed, 0 insertions(+), 0 deletions(-) diff --git a/flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet b/flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet index 3e63df5a39e96b144607086be1a672feedbee39a..2248d37be818242b448f2b34f05b579fb531c939 100644 GIT binary patch delta 541 zcmX>q-X`i0;22~m$|5QvD&Zr_B+4Mfz`&)##=yW(kW*TgnJ>y9DkLhV1C$mJWfNtS zF=NnRgUCog>5ZW;7&mt_88PzCF=dcoD^AWx%}o>)n0$&kXR;TIJ!8-0E*5E-35;Sh z7}adlIIx&7S&>78NvvY>GZqcjYm90O7$^2iOx9*~WO>G{wr_F)YbMJbR<(}FcUf~; zPqC~0Vw>#GW;6LMn=ES=r`jow$^7gEte3dd7I96MX4adm!XY~OIJ*|>G#<5Q+>>j8 zB0OxOnoK}%F^J9M6%$e85L?E}ASolk;}{g=A8rM4uf!QveGsUaoW$XfmhR~2=v?j? z=;)Z@=n7<}J2^VKlmi7Fi$H`2NZ!!`L;xim9X(yN_4PqqR~Mjapn4Di(g*_vK!#I( zRJfZ#RD@SrvT;yGibq)0I8NmlC2=4oeGk2D{@lYf^w1~Kqfl}!}U8=0hyb>aW*nC+D@+F URslxTLT(9826c7@22h#+0Nc@#`Tzg` delta 851 zcmaiz&u1F!hO`o}KVVaSNMe+3DPUV^SgFOt z=)u&BhbH5}0|yU1_M*KQk0u`Uq<0dp#CYr<;0&!s{1GRc@4WYB_Pd|=lHG&v&lYmn zh(H0JhiC!v5O@LbBozR_^2*B8_GV|N*@}k45I`?fC!r6zp#-Hs9E6mS3)*Pp_IJoLaGlEPK9XlbMc5b6fP592;+}+Fi{v3UMG`hhzDRa*AO^)_ z8#=I&4`O1LKgAiv9U?vF)EZeTk_Ec#(2)cd5kSQk8wJm zqJ?1|rvbixpGxX73v*1^MV!j$JG#ySyvu61&V;>qx|Y~pO&1$$>0-A$y%fGtHXd^(!>U;Pu>$uFtSyev!H2ER*2}h5^H&Kuepe>D_f|bkC$_L!+$%CPDceXA-x= zw^l91tV-D^R7w$MhD(%9XHk7_<0of>o&25Cj21g)bhc}hv)$F{tYNTj7Vl6K#z~u{ ca`Z-i%84w+B!TWt=}@2U=;-KN?ilFknBwRPWTrbgI=Ykt1s#h( zga=68(E>yOB^(_+U9|P}L0nfCplYCc5CPH%0|r2bQ+`yqn?Y2BS6Z@hP)3SJSQSXt zy9{V4$mDcq1AQPHq|mXx0wm>_4I;o6fEbP-=Quh>8if@nxmg-Sg&URw4Fefe83mSc z$^>};sKnXPzY5hsPGGkqIS6F5Q$bR0MNW!aP)tDtDuU9P?%Y<`PEUHKq&_Y{khLskw=wLX#)5kJuVrE$q$)?C*Na|oSe=fHTg7~3hNslwIe)} z4VdMCs^li0XXB7Vb&x2NScTXWMzJ}JY8`xP4|t(U)h1i8i?XiaSNp>^If%W0^@D)g z0fEWK*>l;q2&uIQ0WAvT5T5MEVWNCPM65!ML+pSsgQScEk7H1ff4CJS5F{3`>4Sjb zbPvl&aSyAEF!nQuj0iGJ25AZK(bm_u05Uus(*n%(^+95Rz97wxULXQ&4wwP5 z6=<(tPLg{?8ql1QWT07Kb85ayGXLFqur?mf(>ACQxP$1_n^#1pu5n!@vLl From f09b8d33cc2c0ca739318e6ad0aeb7f530c21679 Mon Sep 17 00:00:00 2001 From: Teresa Burlingame Date: Fri, 8 May 2026 10:57:59 -0700 Subject: [PATCH 16/21] test fix --- ...05245_2025-03-31_flagsPlausibility.parquet | Bin 2834 -> 2284 bytes .../testthat/test-wrap-precip-pluvio-stats.R | 2 ++ 2 files changed, 2 insertions(+) diff --git a/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet b/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet index f95b5e44ae5503488765451ccabc91869351bbc8..09e08e74815564332bba9e7db6d2e36d1e47f861 100644 GIT binary patch delta 569 zcmbOv_C|2SPo~NIOnrQkHKq&_Y{khLskw=wJd+JsyeBVU(O}Hke3GS?ak3b@*ML(vv^1sZXwCmzaE(%~nlp6Teu68i!aDAA_Wf1dn4-kbk%p#1M%CEczf|mhLq9 z3%i|Kq$3cNI|e#Bnu1t9Ai~K4L;{7Nq>IVq9~|>zD}R z2YQ071S^8t4w82QiMpp`B)dWM=fl)P_2OG=OP< z8c-RaKY2ZeY`qi6Iqs&0N$wS3Z#g?U`Z_wMgY<)a1v1Mq7)%C&Tm_T|x;5F+1tjDI z^bS}GNCf7CJ>m`KLxK^+1grD#%83jQDocq7N=tFiDT#te1I;g2a0EI4$N&clP`M|_ jOdtTd!OHV{5Vzgf2_#C>qF4?JFhESDR?0_(+H^oV+i^Y+AxVityd_SRK=UP~`CPK2 zvhaYZGYboXjXwejrizJ$0SSq@Ebz`wQKO!$d-v|${qA=^|5|$~zCYtwmISz9kYyM~ za-kQQ4f~`wvd8`PhfeM>k}$AZ2c23lKwhsr2wxr5+D{v5tzFcaAQ#~z-(WcCA6Q2; zMNU^@?D5Adt{3d~g}NXoPQ+KNc*4qmyz*CX*coc~yS9y2xCY#cylkMD~cE0!%0@`JhyKRM~TwOW*k@+7wiBB-~Fl z0YE*3B0vRFG%P9r+L$I`h%SR6O`ZGfY?It^-`?n%6+LD?$>~-%H?rD#8Y^anooLkV zO(85y3OnSt`^J_=(IJy>*SqiNRVr*?2Gb?R8 zYozk6y6#K>ZAR;N@ma~D*B&qKDV4IPS!Xy~SOD7Iw|S z=H{$z!|3SEgg$m?tIcHqwYm&&sXv4LR!1M238PeMYQ^P#ExXiDehBMxThV&g7%XyAOCIs(|@BBG-K? uh2m=v0ZO2W1J84|zLW!@m%x#6d0xerT9N29OU%? diff --git a/flow/tests/testthat/test-wrap-precip-pluvio-stats.R b/flow/tests/testthat/test-wrap-precip-pluvio-stats.R index e9f636582..5cac99088 100644 --- a/flow/tests/testthat/test-wrap-precip-pluvio-stats.R +++ b/flow/tests/testthat/test-wrap-precip-pluvio-stats.R @@ -16,7 +16,9 @@ #' pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/ #' data/ - pluvio_CFGLOC105245_2025-03-31.parquet (60 rows, 1 hr) #' flags/ - pluvio_CFGLOC105245_2025-03-31_flagsCal.parquet +#' (validCalQF, suspectCalQF) #' pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet +#' (nullQF, rangeQF, gapQF, sensorErrorQF, heaterErrorQF) #' uncertainty_coef/ - pluvio_CFGLOC105245_2025-03-31_uncertaintyCoef.json #' (U_CVALA1 = 0.02) From 7edd29c8b8e4bbe4de54aee7914a926fd41a7c05 Mon Sep 17 00:00:00 2001 From: burlingamet Date: Fri, 8 May 2026 12:08:07 -0600 Subject: [PATCH 17/21] changing which flags feed to finalQF --- .../wrap.precip.pluvio.stats.R | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R b/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R index 81170d8bb..cc45d315d 100644 --- a/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R +++ b/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R @@ -265,7 +265,7 @@ wrap.precip.pluvio.stats <- function(DirIn, )] # Calculate finalQF for 1-minute data - stats_01min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, na.rm = TRUE)] + stats_01min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, suspectCalQF, na.rm = TRUE)] # Create 30-minute time groups stats_01min[, time_group := floor_date(startDateTime, "30 mins")] @@ -295,18 +295,11 @@ wrap.precip.pluvio.stats <- function(DirIn, ), by = time_group] # Update finalQF based on the aggregated flags - stats_30min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, validCalQF, suspectCalQF, na.rm = TRUE)] + stats_30min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, suspectCalQF, na.rm = TRUE)] # Clean up temporary columns stats_01min[, time_group := NULL] stats_30min[, time_group := NULL] - # Clean up - stats_01min[, startDateTime_1min := NULL] - stats_30min[, startDateTime_1min := NULL] - - # Clean up - stats_01min[, endDateTime_1min := NULL] - stats_30min[, endDateTime_1min := NULL] # Reorder columns to match schema requirements col_order <- c('startDateTime', 'endDateTime', 'precipBulk', 'precipBulkExpUncert', 'precipNumPts', From c5d15db9c5e5487d173c262127be34c91d4fe750 Mon Sep 17 00:00:00 2001 From: Teresa Burlingame Date: Mon, 11 May 2026 13:55:06 -0400 Subject: [PATCH 18/21] updating script to give -1 values for missing cal flag, and put ucrt at NA when all data is NA --- .../wrap.precip.pluvio.flags.R | 2 +- .../wrap.precip.pluvio.stats.R | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R index 58315df3a..ffcc91bac 100644 --- a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R +++ b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R @@ -173,7 +173,7 @@ wrap.precip.pluvio.flags<- function(DirIn, if (data$inletTemp[i] > 0 ) { qfPlau$heaterErrorQF[i] <- 0 } - if (data$inletTemp[i] < 0 & data$cell_temperature[i] > -10 ) { #see if there's a threshold where it stops trying to heat? + if (data$inletTemp[i] < 0 & data$cell_temperature[i] > -40 ) { #see if there's a threshold where it stops trying to heat? qfPlau$heaterErrorQF[i] <- 1 } } diff --git a/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R b/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R index cc45d315d..47b8af1a7 100644 --- a/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R +++ b/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R @@ -164,6 +164,20 @@ wrap.precip.pluvio.stats <- function(DirIn, data <- merge(data, qfCal, by = 'readout_time', all = TRUE) data <- merge(data, qfPlau, by = 'readout_time', all = TRUE) + # If validCalQF or suspectCalQF are missing/NA, set to -1 + if (!('validCalQF' %in% names(data)) || all(is.na(data$validCalQF))) { + data[, validCalQF := -1L] + log$warn('validCalQF not found or all NA — setting to -1') + } else { + data[is.na(validCalQF), validCalQF := -1L] + } + if (!('suspectCalQF' %in% names(data)) || all(is.na(data$suspectCalQF))) { + data[, suspectCalQF := -1L] + log$warn('suspectCalQF not found or all NA — setting to -1') + } else { + data[is.na(suspectCalQF), suspectCalQF := -1L] + } + # Read uncertainty coefficients fileUcrt <- base::dir(dir_paths$uncertainty_coef) @@ -275,7 +289,7 @@ wrap.precip.pluvio.stats <- function(DirIn, startDateTime = min(startDateTime), endDateTime = max(endDateTime), precipBulk = ifelse(all(is.na(precipBulk)), NA_real_, sum(precipBulk, na.rm = TRUE)), - precipBulkExpUncert = sqrt(sum(precipBulkExpUncert^2, na.rm = TRUE)) * 2, # Quadrature sum with 2x multiplier + precipBulkExpUncert = ifelse(all(is.na(precipBulkExpUncert)), NA_real_, sqrt(sum(precipBulkExpUncert^2, na.rm = TRUE)) * 2), # Quadrature sum with 2x multiplier precipNumPts = sum(precipNumPts, na.rm = TRUE), # Sum the counts from 1-minute intervals nullQF = as.integer(ifelse(mean(nullQF == 1, na.rm = TRUE) >= 0.1, 1L, ifelse(all(is.na(nullQF)), NA_integer_, min(nullQF, na.rm = TRUE)))), From ee7ba4a92f8ac49c7d3c499c483a246e76ec863a Mon Sep 17 00:00:00 2001 From: Teresa Burlingame Date: Mon, 11 May 2026 14:27:55 -0400 Subject: [PATCH 19/21] pipeline sha tags --- .../precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml | 2 +- .../precipWeighingv2_qm_stats_group_and_compute.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pipe/precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml b/pipe/precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml index 2f9270d57..f7aec9f15 100644 --- a/pipe/precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml +++ b/pipe/precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml @@ -4,7 +4,7 @@ pipeline: transform: image_pull_secrets: - battelleecology-quay-read-all-pull-secret - image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pluvio-flgs:sha-fad5b39 + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pluvio-flgs:sha-c5d15db cmd: - sh - "-c" diff --git a/pipe/precipWeighingv2/precipWeighingv2_qm_stats_group_and_compute.yaml b/pipe/precipWeighingv2/precipWeighingv2_qm_stats_group_and_compute.yaml index 58334ac82..146b4c901 100644 --- a/pipe/precipWeighingv2/precipWeighingv2_qm_stats_group_and_compute.yaml +++ b/pipe/precipWeighingv2/precipWeighingv2_qm_stats_group_and_compute.yaml @@ -2,7 +2,7 @@ pipeline: name: precipWeighingv2_qm_stats_group_and_compute transform: - image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pluvio-stats:sha-fad5b39 + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pluvio-stats:sha-c5d15db cmd: - sh - "-c" From 2e24288db9057ec61e326aa40debbda6983e371c Mon Sep 17 00:00:00 2001 From: Teresa Burlingame Date: Mon, 11 May 2026 14:53:25 -0400 Subject: [PATCH 20/21] update to sensor status stream rather than qf stream --- flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R index ffcc91bac..b60227edb 100644 --- a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R +++ b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R @@ -133,7 +133,7 @@ wrap.precip.pluvio.flags<- function(DirIn, #bitwise calculation of flags of interest - for (i in seq_along(data$sensorErrorQF)) { + for (i in seq_along(data$sensorStatus)) { if (is.na(data$sensorStatus[i])) { qfPlau$sensorErrorQF[i] <- -1 } else { From cb3e16cadeaa12252d8dccedfbad2e8873cfb4de Mon Sep 17 00:00:00 2001 From: Teresa Burlingame Date: Fri, 15 May 2026 12:47:56 -0400 Subject: [PATCH 21/21] adding insuffDataQF to test --- .../flow.precip.pluvio.stats.R | 25 +++++++---- .../wrap.precip.pluvio.stats.R | 44 +++++++++++++------ .../testthat/test-wrap-precip-pluvio-stats.R | 22 +++++++--- 3 files changed, 63 insertions(+), 28 deletions(-) diff --git a/flow/flow.precip.pluvio.stats/flow.precip.pluvio.stats.R b/flow/flow.precip.pluvio.stats/flow.precip.pluvio.stats.R index 29f026b60..9c99e2364 100644 --- a/flow/flow.precip.pluvio.stats/flow.precip.pluvio.stats.R +++ b/flow/flow.precip.pluvio.stats/flow.precip.pluvio.stats.R @@ -138,7 +138,8 @@ Para <- ), NameParaOptn = c( "DirSubCopy", - "FileSchmData" + "FileSchmData01", + "FileSchmData30" ), log = log ) @@ -148,15 +149,20 @@ log$debug(base::paste0('Input directory: ', Para$DirIn)) log$debug(base::paste0('Output directory: ', Para$DirOut)) log$debug(base::paste0('Error directory: ', Para$DirErr)) -# Retrieve output schema for stats -FileSchmData <- Para$FileSchmData +# Retrieve output schemas for 1-min and 30-min stats +FileSchmData01 <- Para$FileSchmData01 +FileSchmData30 <- Para$FileSchmData30 -#one or more will always be null -# Read in the schema -if(base::is.null(FileSchmData) || FileSchmData == 'NA'){ - FileSchmData <- NULL +# Read in the schemas +if(base::is.null(FileSchmData01) || FileSchmData01 == 'NA'){ + FileSchmData01 <- NULL } else { - FileSchmData <- base::paste0(base::readLines(FileSchmData),collapse='') + FileSchmData01 <- base::paste0(base::readLines(FileSchmData01),collapse='') +} +if(base::is.null(FileSchmData30) || FileSchmData30 == 'NA'){ + FileSchmData30 <- NULL +} else { + FileSchmData30 <- base::paste0(base::readLines(FileSchmData30),collapse='') } # Retrieve optional subdirectories to copy over @@ -189,7 +195,8 @@ foreach::foreach(idxDirIn = DirIn) %dopar% { withCallingHandlers( wrap.precip.pluvio.stats(DirIn=idxDirIn, DirOutBase=Para$DirOut, - SchmData = FileSchmData, + SchmData01 = FileSchmData01, + SchmData30 = FileSchmData30, DirSubCopy=DirSubCopy, log=log ), diff --git a/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R b/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R index 47b8af1a7..bf4affae1 100644 --- a/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R +++ b/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R @@ -71,7 +71,8 @@ ############################################################################################## wrap.precip.pluvio.stats <- function(DirIn, DirOutBase, - SchmData = NULL, #new schema with all data + SchmData01 = NULL, #schema for 1-minute output + SchmData30 = NULL, #schema for 30-minute output (includes insuffDataQF) DirSubCopy = NULL, log = NULL) { @@ -308,33 +309,47 @@ wrap.precip.pluvio.stats <- function(DirIn, ifelse(all(is.na(heaterErrorQF)), NA_integer_, min(heaterErrorQF, na.rm = TRUE)))) ), by = time_group] - # Update finalQF based on the aggregated flags - stats_30min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, suspectCalQF, na.rm = TRUE)] + # Compute insuffDataQF for 30-minute data: + # Flag when null QF has not triggered but data is incomplete AND measurable precip was recorded. + # insuffDataQF = 1: nullQF=0 (pass), precipNumPts < 30, and precipBulk > 0 (potential missed precip) + # insuffDataQF = 0: full data (precipNumPts >= 30), or no measurable precip + # insuffDataQF = -1: null flag already triggered (redundant to flag insufficiency separately) + stats_30min[, insuffDataQF := as.integer( + fifelse(nullQF == 1L, -1L, + fifelse(precipNumPts >= 30L, 0L, + fifelse(is.na(precipBulk) | precipBulk <= 0, 0L, 1L))) + )] + + # Update finalQF based on the aggregated flags, including insuffDataQF + stats_30min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, suspectCalQF, insuffDataQF, na.rm = TRUE)] # Clean up temporary columns stats_01min[, time_group := NULL] stats_30min[, time_group := NULL] - # Reorder columns to match schema requirements - col_order <- c('startDateTime', 'endDateTime', 'precipBulk', 'precipBulkExpUncert', 'precipNumPts', - 'nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF', - 'sensorErrorQF', 'validCalQF', 'suspectCalQF', 'finalQF') - - stats_aggr01 <- stats_01min[, ..col_order] - stats_aggr30 <- stats_30min[, ..col_order] + # Reorder columns to match schema requirements + col_order_01 <- c('startDateTime', 'endDateTime', 'precipBulk', 'precipBulkExpUncert', 'precipNumPts', + 'nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF', + 'sensorErrorQF', 'validCalQF', 'suspectCalQF', 'finalQF') + col_order_30 <- c('startDateTime', 'endDateTime', 'precipBulk', 'precipBulkExpUncert', 'precipNumPts', + 'insuffDataQF', 'nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF', + 'sensorErrorQF', 'validCalQF', 'suspectCalQF', 'finalQF') + + stats_aggr01 <- stats_01min[, ..col_order_01] + stats_aggr30 <- stats_30min[, ..col_order_30] # Convert back to data.frame if needed for writing setDF(stats_aggr01) setDF(stats_aggr30) # Write output files - write_output_files(stats_aggr01, stats_aggr30, files$data, dirOutStat, SchmData, log) + write_output_files(stats_aggr01, stats_aggr30, files$data, dirOutStat, SchmData01, SchmData30, log) return() } # Helper function for file writing -write_output_files <- function(stats_01, stats_30, fileData, dirOutStat, SchmData, log) { +write_output_files <- function(stats_01, stats_30, fileData, dirOutStat, SchmData01, SchmData30, log) { if (is.na(fileData)) return() # Create output filenames @@ -349,7 +364,8 @@ write_output_files <- function(stats_01, stats_30, fileData, dirOutStat, SchmDat file_paths <- fs::path(dirOutStat, file_names) datasets <- list(stats_01, stats_30) - + schm_list <- list(SchmData01, SchmData30) + # Write files for (i in seq_along(file_paths)) { rptWrte <- base::try( @@ -357,7 +373,7 @@ write_output_files <- function(stats_01, stats_30, fileData, dirOutStat, SchmDat data = datasets[[i]], NameFile = file_paths[i], NameFileSchm = NULL, - Schm = SchmData, + Schm = schm_list[[i]], log = log ), silent = TRUE diff --git a/flow/tests/testthat/test-wrap-precip-pluvio-stats.R b/flow/tests/testthat/test-wrap-precip-pluvio-stats.R index 5cac99088..fc6367612 100644 --- a/flow/tests/testthat/test-wrap-precip-pluvio-stats.R +++ b/flow/tests/testthat/test-wrap-precip-pluvio-stats.R @@ -90,7 +90,7 @@ test_that("Unit test of wrap.precip.pluvio.stats.R", { testthat::expect_true(all(expected_cols %in% names(stats_001))) # -------------------------------------------------------------------------------------------- - # Test 4: 30-min output schema contains all expected columns + # Test 4: 30-min output schema contains all expected columns including insuffDataQF # -------------------------------------------------------------------------------------------- stats_030 <- NEONprocIS.base::def.read.parq.ds( fileIn = file_030, @@ -99,7 +99,11 @@ test_that("Unit test of wrap.precip.pluvio.stats.R", { Df = TRUE ) - testthat::expect_true(all(expected_cols %in% names(stats_030))) + expected_cols_030 <- c(expected_cols[1:5], 'insuffDataQF', expected_cols[6:length(expected_cols)]) + testthat::expect_true(all(expected_cols_030 %in% names(stats_030))) + + # insuffDataQF must not appear in 1-min output + testthat::expect_false('insuffDataQF' %in% names(stats_001)) # -------------------------------------------------------------------------------------------- # Test 5: 30-min output has fewer rows than 1-min output @@ -133,20 +137,28 @@ test_that("Unit test of wrap.precip.pluvio.stats.R", { # -------------------------------------------------------------------------------------------- # Test 10: Quality flag values are within the expected set {-1, 0, 1} # -------------------------------------------------------------------------------------------- - qf_cols <- c('nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF', - 'sensorErrorQF', 'finalQF') + qf_cols_001 <- c('nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF', + 'sensorErrorQF', 'finalQF') + qf_cols_030 <- c(qf_cols_001, 'insuffDataQF') - for (col in qf_cols) { + for (col in qf_cols_001) { testthat::expect_true( all(stats_001[[col]] %in% c(-1L, 0L, 1L), na.rm = TRUE), info = paste0('Column ', col, ' in 1-min output has unexpected values') ) + } + for (col in qf_cols_030) { testthat::expect_true( all(stats_030[[col]] %in% c(-1L, 0L, 1L), na.rm = TRUE), info = paste0('Column ', col, ' in 30-min output has unexpected values') ) } + # -------------------------------------------------------------------------------------------- + # Test 10b: insuffDataQF is 0 when precipNumPts == 30 (fixture has complete data) + # -------------------------------------------------------------------------------------------- + testthat::expect_true(all(stats_030$insuffDataQF == 0L, na.rm = TRUE)) + # -------------------------------------------------------------------------------------------- # Test 11: precipNumPts equals 1 for each non-NA 1-min row and sums to 30 per 30-min interval # --------------------------------------------------------------------------------------------