Skip to content
Draft

Pluvio #1191

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
66a9b5c
Merge remote-tracking branch 'origin' into pluvio
burlingamet Jul 23, 2025
9bc5a8d
Merge branch 'master' of github.com:NEONScience/NEON-IS-data-processi…
burlingamet Jul 23, 2025
d06532f
Merge branch 'master' of github.com:NEONScience/NEON-IS-data-processi…
burlingamet Jul 23, 2025
ab86e58
Merge branch 'pluvio' of github.com:NEONScience/NEON-IS-data-processi…
covesturtevant Aug 13, 2025
0121c9d
Merge branch 'master' of github.com:NEONScience/NEON-IS-data-processi…
burlingamet Aug 18, 2025
14d2003
Merge branch 'master' of github.com:NEONScience/NEON-IS-data-processi…
burlingamet Sep 10, 2025
e5e5bbf
Merge branch 'pluvio' of github.com:NEONScience/NEON-IS-data-processi…
burlingamet Sep 10, 2025
68836bb
Merge branch 'master' into pluvio
covesturtevant Sep 10, 2025
bb548a2
Merge branch 'pluvio' of github.com:NEONScience/NEON-IS-data-processi…
covesturtevant Sep 10, 2025
902a75c
Merge branch 'master' of github.com:NEONScience/NEON-IS-data-processi…
burlingamet Sep 24, 2025
832b3f6
change plvuio to look at calibration loader again
burlingamet Sep 24, 2025
7b2e6c9
updating dates and turning off cron so it doesn't continue to run.
burlingamet Sep 24, 2025
8a6e584
Merge branch 'master' of github.com:NEONScience/NEON-IS-data-processi…
burlingamet Oct 13, 2025
328dc41
Merge branch 'master' of github.com:NEONScience/NEON-IS-data-processi…
burlingamet Oct 15, 2025
d2b99ca
testing different source type for cal loader
burlingamet Oct 15, 2025
96f169f
change back to pluvio raw
burlingamet Oct 16, 2025
24ea5bd
use test image for calibration loader that differentiates between sen…
covesturtevant Oct 17, 2025
efae01f
Merge branch 'master' into pluvio
covesturtevant Oct 17, 2025
cb47ac1
merge master, resolve conflict
covesturtevant Oct 17, 2025
51d53c9
merge master
burlingamet Mar 17, 2026
7838b5b
Add WREFDFIR and change cron to current time window
burlingamet Mar 17, 2026
af2ea60
Merge branch 'master' of github.com:NEONScience/NEON-IS-data-processi…
burlingamet Mar 20, 2026
29611ec
Merge branch 'master' of github.com:NEONScience/NEON-IS-data-processi…
burlingamet Apr 7, 2026
fad5b39
pluvio updates to only use one schema. Change logic in SUM to not hav…
burlingamet Apr 8, 2026
07abeb1
update cron for more dates, sha tag testing of updates to pluvio code…
burlingamet Apr 8, 2026
df4d5ee
pulling cron back to have full month of data
burlingamet Apr 8, 2026
7329feb
Merge branch 'master' of github.com:NEONScience/NEON-IS-data-processi…
burlingamet Apr 16, 2026
7b48b6f
add pub pipes
burlingamet Apr 16, 2026
6a9ed9c
Merge branch 'master' of github.com:NEONScience/NEON-IS-data-processi…
burlingamet May 8, 2026
2833cdb
changing flag logic to flag for cold temps rather than use heater sta…
burlingamet May 8, 2026
172a7d4
updating precip flagging and adding unit tests.
burlingamet May 8, 2026
5ad3b3b
strip stale schema if schemaQF is null
burlingamet May 8, 2026
0c5951a
didn't indext
burlingamet May 8, 2026
7cf1c96
data fix
burlingamet May 8, 2026
f09b8d3
test fix
burlingamet May 8, 2026
7edd29c
changing which flags feed to finalQF
burlingamet May 8, 2026
c5d15db
updating script to give -1 values for missing cal flag, and put ucrt …
burlingamet May 11, 2026
ee7ba4a
pipeline sha tags
burlingamet May 11, 2026
2e24288
update to sensor status stream rather than qf stream
burlingamet May 11, 2026
cb3e16c
adding insuffDataQF to test
burlingamet May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 28 additions & 61 deletions flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#' Teresa Burlingame \email{tburlingame@battelleecology.org} \cr

#' @description Workflow. Compute the heater and status flags by assessing the bit rate. Only
#' flagging alarm codes of interest.
#' flagging alarm codes of interest. Flag heater error if inlet temp is below freezing in near freezing conditions
#'
#' @param DirIn Character value. The input path to the data from a single source ID, structured as follows:
#' #/pfs/BASE_REPO/#/yyyy/mm/dd/#/location-id, where # indicates any number of parent and child directories
Expand Down Expand Up @@ -67,6 +67,10 @@
# changelog and author contributions / copyrights
# Teresa Burlingame (2025-04-15)
# Initial creation
# Teresa Burlingame (2026-04-08)
# removing unnecessary block and adding in schema to report write
# Teresa Burlingame (2026-05-08)
# changing test so that it looks at inlet temp rather than status
##############################################################################################
wrap.precip.pluvio.flags<- function(DirIn,
DirOutBase,
Expand Down Expand Up @@ -118,15 +122,6 @@ wrap.precip.pluvio.flags<- function(DirIn,
Df=TRUE,
log=log)

## wipe preexisting schema TODO check with Cove
# Remove the "schema" attribute
#remove existing schema from plau so we can add more cols.
if (is.null(SchmQf)){
base::attr(qfPlau, "schema") <- NULL
} else {
base::attr(qfPlau, "schema") <- SchmQf
}

# if there are no heater streams add them in as NA
if(!('heater_status' %in% names(data))){
data$heater_status <- NA
Expand All @@ -138,7 +133,7 @@ wrap.precip.pluvio.flags<- function(DirIn,

#bitwise calculation of flags of interest

for (i in seq_along(data$sensorErrorQF)) {
for (i in seq_along(data$sensorStatus)) {
if (is.na(data$sensorStatus[i])) {
qfPlau$sensorErrorQF[i] <- -1
} else {
Expand Down Expand Up @@ -169,64 +164,36 @@ wrap.precip.pluvio.flags<- function(DirIn,
}
}

#heater status for bit vals of interest
for (i in seq_along(data$heater_status)) {
if (is.na(data$heater_status[i])) {
qfPlau$heaterErrorQF[i] <- -1
} else {
if (data$heater_status[i] == 0) {
qfPlau$heaterErrorQF[i] <- 0
}
if ((data$heater_status[i] / 2^5) %% 2 >= 1) { #functional check failed
qfPlau$heaterErrorQF[i] <- 1
}
if ((data$heater_status[i] / 2^7) %% 2 >= 1) { #heater deactivated or not present
qfPlau$heaterErrorQF[i] <- 1
}
}
}

# "pass through" of data
# TODO ask Cove if this is necessary?
# qfs added to list of flags to process through qm module.
#
# #get file name based on date of data in directory
# nameFileOut <- fileData
#
# # Write out the time shifted dataset to file
# fileOut <- fs::path(dirOutData,nameFileOut)
#
# rptWrte <-
# base::try(NEONprocIS.base::def.wrte.parq(
# data = data,
# NameFile = fileOut,
# log=log
# ),
# silent = TRUE)
#
# if ('try-error' %in% base::class(rptWrte)) {
# log$error(base::paste0(
# 'Cannot write output to ',
# fileOut,
# '. ',
# attr(rptWrte, "condition")
# ))
# stop()
# } else {
# log$info(base::paste0(
# 'Data file written to file ',
# fileOut
# ))
# }

# If inlet temperature indicates that sensor is not adequately heating

for (i in seq_along(data$inletTemp)) {
if (is.na(data$inletTemp[i])) {
qfPlau$heaterErrorQF[i] <- -1
} else {
if (data$inletTemp[i] > 0 ) {
qfPlau$heaterErrorQF[i] <- 0
}
if (data$inletTemp[i] < 0 & data$cell_temperature[i] > -40 ) { #see if there's a threshold where it stops trying to heat?
qfPlau$heaterErrorQF[i] <- 1
}
}
}

nameFileQfOutFlag <- fileQfPlau

nameFileQfOutFlag <- fs::path(dirOutQf,nameFileQfOutFlag)

# Strip inherited parquet schema attribute so def.wrte.parq does not error
# on a column count mismatch after heaterErrorQF and sensorErrorQF were added.
if(base::is.null(SchmQf)){
attr(qfPlau, 'schema') <- NULL
}

rptWrte <-
base::try(NEONprocIS.base::def.wrte.parq(
data = qfPlau,
NameFile = nameFileQfOutFlag,
Schm=SchmQf,
log=log
),
silent = TRUE)
Expand Down
25 changes: 16 additions & 9 deletions flow/flow.precip.pluvio.stats/flow.precip.pluvio.stats.R
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ Para <-
),
NameParaOptn = c(
"DirSubCopy",
"FileSchmData"
"FileSchmData01",
"FileSchmData30"
),
log = log
)
Comment on lines 138 to 145
Expand All @@ -148,15 +149,20 @@ log$debug(base::paste0('Input directory: ', Para$DirIn))
log$debug(base::paste0('Output directory: ', Para$DirOut))
log$debug(base::paste0('Error directory: ', Para$DirErr))

# Retrieve output schema for stats
FileSchmData <- Para$FileSchmData
# Retrieve output schemas for 1-min and 30-min stats
FileSchmData01 <- Para$FileSchmData01
FileSchmData30 <- Para$FileSchmData30

#one or more will always be null
# Read in the schema
if(base::is.null(FileSchmData) || FileSchmData == 'NA'){
FileSchmData <- NULL
# Read in the schemas
if(base::is.null(FileSchmData01) || FileSchmData01 == 'NA'){
FileSchmData01 <- NULL
} else {
FileSchmData <- base::paste0(base::readLines(FileSchmData),collapse='')
FileSchmData01 <- base::paste0(base::readLines(FileSchmData01),collapse='')
}
if(base::is.null(FileSchmData30) || FileSchmData30 == 'NA'){
FileSchmData30 <- NULL
} else {
FileSchmData30 <- base::paste0(base::readLines(FileSchmData30),collapse='')
}

# Retrieve optional subdirectories to copy over
Expand Down Expand Up @@ -189,7 +195,8 @@ foreach::foreach(idxDirIn = DirIn) %dopar% {
withCallingHandlers(
wrap.precip.pluvio.stats(DirIn=idxDirIn,
DirOutBase=Para$DirOut,
SchmData = FileSchmData,
SchmData01 = FileSchmData01,
SchmData30 = FileSchmData30,
DirSubCopy=DirSubCopy,
log=log
),
Expand Down
73 changes: 49 additions & 24 deletions flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@
# changelog and author contributions / copyrights
# Teresa Burlingame (2025-07-21)
# Initial creation
# Teresa Burlingame (2025-04-08)
# change sum logic to be NA when all 30 mins are NA.
##############################################################################################
wrap.precip.pluvio.stats <- function(DirIn,
DirOutBase,
SchmData = NULL, #new schema with all data
SchmData01 = NULL, #schema for 1-minute output
SchmData30 = NULL, #schema for 30-minute output (includes insuffDataQF)
DirSubCopy = NULL,
Comment on lines 72 to 76
log = NULL) {

Expand Down Expand Up @@ -162,6 +165,20 @@ wrap.precip.pluvio.stats <- function(DirIn,
data <- merge(data, qfCal, by = 'readout_time', all = TRUE)
data <- merge(data, qfPlau, by = 'readout_time', all = TRUE)

# If validCalQF or suspectCalQF are missing/NA, set to -1
if (!('validCalQF' %in% names(data)) || all(is.na(data$validCalQF))) {
data[, validCalQF := -1L]
log$warn('validCalQF not found or all NA — setting to -1')
} else {
data[is.na(validCalQF), validCalQF := -1L]
}
if (!('suspectCalQF' %in% names(data)) || all(is.na(data$suspectCalQF))) {
data[, suspectCalQF := -1L]
log$warn('suspectCalQF not found or all NA — setting to -1')
} else {
data[is.na(suspectCalQF), suspectCalQF := -1L]
}

# Read uncertainty coefficients
fileUcrt <- base::dir(dir_paths$uncertainty_coef)

Expand Down Expand Up @@ -263,7 +280,7 @@ wrap.precip.pluvio.stats <- function(DirIn,
)]

# Calculate finalQF for 1-minute data
stats_01min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, na.rm = TRUE)]
stats_01min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, suspectCalQF, na.rm = TRUE)]

# Create 30-minute time groups
stats_01min[, time_group := floor_date(startDateTime, "30 mins")]
Expand All @@ -272,8 +289,8 @@ wrap.precip.pluvio.stats <- function(DirIn,
stats_30min <- stats_01min[, .(
startDateTime = min(startDateTime),
endDateTime = max(endDateTime),
precipBulk = sum(precipBulk, na.rm = TRUE),
precipBulkExpUncert = sqrt(sum(precipBulkExpUncert^2, na.rm = TRUE)) * 2, # Quadrature sum with 2x multiplier
precipBulk = ifelse(all(is.na(precipBulk)), NA_real_, sum(precipBulk, na.rm = TRUE)),
precipBulkExpUncert = ifelse(all(is.na(precipBulkExpUncert)), NA_real_, sqrt(sum(precipBulkExpUncert^2, na.rm = TRUE)) * 2), # Quadrature sum with 2x multiplier
precipNumPts = sum(precipNumPts, na.rm = TRUE), # Sum the counts from 1-minute intervals
nullQF = as.integer(ifelse(mean(nullQF == 1, na.rm = TRUE) >= 0.1, 1L,
ifelse(all(is.na(nullQF)), NA_integer_, min(nullQF, na.rm = TRUE)))),
Expand All @@ -292,40 +309,47 @@ wrap.precip.pluvio.stats <- function(DirIn,
ifelse(all(is.na(heaterErrorQF)), NA_integer_, min(heaterErrorQF, na.rm = TRUE))))
), by = time_group]

# Update finalQF based on the aggregated flags
stats_30min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, validCalQF, suspectCalQF, na.rm = TRUE)]
# Compute insuffDataQF for 30-minute data:
# Flag when null QF has not triggered but data is incomplete AND measurable precip was recorded.
# insuffDataQF = 1: nullQF=0 (pass), precipNumPts < 30, and precipBulk > 0 (potential missed precip)
# insuffDataQF = 0: full data (precipNumPts >= 30), or no measurable precip
# insuffDataQF = -1: null flag already triggered (redundant to flag insufficiency separately)
stats_30min[, insuffDataQF := as.integer(
fifelse(nullQF == 1L, -1L,
fifelse(precipNumPts >= 30L, 0L,
fifelse(is.na(precipBulk) | precipBulk <= 0, 0L, 1L)))
)]

# Update finalQF based on the aggregated flags, including insuffDataQF
stats_30min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, suspectCalQF, insuffDataQF, na.rm = TRUE)]

# Clean up temporary columns
stats_01min[, time_group := NULL]
stats_30min[, time_group := NULL]
# Clean up
stats_01min[, startDateTime_1min := NULL]
stats_30min[, startDateTime_1min := NULL]

# Clean up
stats_01min[, endDateTime_1min := NULL]
stats_30min[, endDateTime_1min := NULL]

# Reorder columns to match schema requirements
col_order <- c('startDateTime', 'endDateTime', 'precipBulk', 'precipBulkExpUncert', 'precipNumPts',
'nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF',
'sensorErrorQF', 'validCalQF', 'suspectCalQF', 'finalQF')

stats_aggr01 <- stats_01min[, ..col_order]
stats_aggr30 <- stats_30min[, ..col_order]
# Reorder columns to match schema requirements
col_order_01 <- c('startDateTime', 'endDateTime', 'precipBulk', 'precipBulkExpUncert', 'precipNumPts',
'nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF',
'sensorErrorQF', 'validCalQF', 'suspectCalQF', 'finalQF')
col_order_30 <- c('startDateTime', 'endDateTime', 'precipBulk', 'precipBulkExpUncert', 'precipNumPts',
'insuffDataQF', 'nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF',
'sensorErrorQF', 'validCalQF', 'suspectCalQF', 'finalQF')

stats_aggr01 <- stats_01min[, ..col_order_01]
stats_aggr30 <- stats_30min[, ..col_order_30]

# Convert back to data.frame if needed for writing
setDF(stats_aggr01)
setDF(stats_aggr30)

# Write output files
write_output_files(stats_aggr01, stats_aggr30, files$data, dirOutStat, SchmData, log)
write_output_files(stats_aggr01, stats_aggr30, files$data, dirOutStat, SchmData01, SchmData30, log)

return()
}

# Helper function for file writing
write_output_files <- function(stats_01, stats_30, fileData, dirOutStat, SchmData, log) {
write_output_files <- function(stats_01, stats_30, fileData, dirOutStat, SchmData01, SchmData30, log) {
if (is.na(fileData)) return()

# Create output filenames
Expand All @@ -340,15 +364,16 @@ write_output_files <- function(stats_01, stats_30, fileData, dirOutStat, SchmDat

file_paths <- fs::path(dirOutStat, file_names)
datasets <- list(stats_01, stats_30)

schm_list <- list(SchmData01, SchmData30)

# Write files
for (i in seq_along(file_paths)) {
rptWrte <- base::try(
NEONprocIS.base::def.wrte.parq(
data = datasets[[i]],
NameFile = file_paths[i],
NameFileSchm = NULL,
Schm = SchmData,
Schm = schm_list[[i]],
log = log
),
silent = TRUE
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[
{
"Name": "U_CVALA1",
"Value": "0.02",
"start_date": "2024-01-01T00:00:00Z",
"end_date": "2026-12-31T23:59:59Z"
}
]
Loading