diff --git a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R index 15bbfa0a0..b60227edb 100644 --- a/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R +++ b/flow/flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R @@ -5,7 +5,7 @@ #' Teresa Burlingame \email{tburlingame@battelleecology.org} \cr #' @description Workflow. Compute the heater and status flags by assessing the bit rate. Only -#' flagging alarm codes of interest. +#' flagging alarm codes of interest. Flag heater error if inlet temp is below freezing in near freezing conditions #' #' @param DirIn Character value. The input path to the data from a single source ID, structured as follows: #' #/pfs/BASE_REPO/#/yyyy/mm/dd/#/location-id, where # indicates any number of parent and child directories @@ -67,6 +67,10 @@ # changelog and author contributions / copyrights # Teresa Burlingame (2025-04-15) # Initial creation +# Teresa Burlingame (2026-04-08) +# removing unnecessary block and adding in schema to report write +# Teresa Burlingame (2026-05-08) +# changing test so that it looks at inlet temp rather than status ############################################################################################## wrap.precip.pluvio.flags<- function(DirIn, DirOutBase, @@ -118,15 +122,6 @@ wrap.precip.pluvio.flags<- function(DirIn, Df=TRUE, log=log) - ## wipe preexisting schema TODO check with Cove - # Remove the "schema" attribute - #remove existing schema from plau so we can add more cols. - if (is.null(SchmQf)){ - base::attr(qfPlau, "schema") <- NULL - } else { - base::attr(qfPlau, "schema") <- SchmQf - } - # if there are no heater streams add them in as NA if(!('heater_status' %in% names(data))){ data$heater_status <- NA @@ -138,7 +133,7 @@ wrap.precip.pluvio.flags<- function(DirIn, #bitwise calculation of flags of interest - for (i in seq_along(data$sensorErrorQF)) { + for (i in seq_along(data$sensorStatus)) { if (is.na(data$sensorStatus[i])) { qfPlau$sensorErrorQF[i] <- -1 } else { @@ -169,64 +164,36 @@ wrap.precip.pluvio.flags<- function(DirIn, } } - #heater status for bit vals of interest - for (i in seq_along(data$heater_status)) { - if (is.na(data$heater_status[i])) { - qfPlau$heaterErrorQF[i] <- -1 - } else { - if (data$heater_status[i] == 0) { - qfPlau$heaterErrorQF[i] <- 0 - } - if ((data$heater_status[i] / 2^5) %% 2 >= 1) { #functional check failed - qfPlau$heaterErrorQF[i] <- 1 - } - if ((data$heater_status[i] / 2^7) %% 2 >= 1) { #heater deactivated or not present - qfPlau$heaterErrorQF[i] <- 1 - } - } - } - - # "pass through" of data - # TODO ask Cove if this is necessary? - # qfs added to list of flags to process through qm module. - # - # #get file name based on date of data in directory - # nameFileOut <- fileData - # - # # Write out the time shifted dataset to file - # fileOut <- fs::path(dirOutData,nameFileOut) - # - # rptWrte <- - # base::try(NEONprocIS.base::def.wrte.parq( - # data = data, - # NameFile = fileOut, - # log=log - # ), - # silent = TRUE) - # - # if ('try-error' %in% base::class(rptWrte)) { - # log$error(base::paste0( - # 'Cannot write output to ', - # fileOut, - # '. ', - # attr(rptWrte, "condition") - # )) - # stop() - # } else { - # log$info(base::paste0( - # 'Data file written to file ', - # fileOut - # )) - # } - + # If inlet temperature indicates that sensor is not adequately heating + + for (i in seq_along(data$inletTemp)) { + if (is.na(data$inletTemp[i])) { + qfPlau$heaterErrorQF[i] <- -1 + } else { + if (data$inletTemp[i] > 0 ) { + qfPlau$heaterErrorQF[i] <- 0 + } + if (data$inletTemp[i] < 0 & data$cell_temperature[i] > -40 ) { #see if there's a threshold where it stops trying to heat? + qfPlau$heaterErrorQF[i] <- 1 + } + } + } + nameFileQfOutFlag <- fileQfPlau nameFileQfOutFlag <- fs::path(dirOutQf,nameFileQfOutFlag) + # Strip inherited parquet schema attribute so def.wrte.parq does not error + # on a column count mismatch after heaterErrorQF and sensorErrorQF were added. + if(base::is.null(SchmQf)){ + attr(qfPlau, 'schema') <- NULL + } + rptWrte <- base::try(NEONprocIS.base::def.wrte.parq( data = qfPlau, NameFile = nameFileQfOutFlag, + Schm=SchmQf, log=log ), silent = TRUE) diff --git a/flow/flow.precip.pluvio.stats/flow.precip.pluvio.stats.R b/flow/flow.precip.pluvio.stats/flow.precip.pluvio.stats.R index 29f026b60..9c99e2364 100644 --- a/flow/flow.precip.pluvio.stats/flow.precip.pluvio.stats.R +++ b/flow/flow.precip.pluvio.stats/flow.precip.pluvio.stats.R @@ -138,7 +138,8 @@ Para <- ), NameParaOptn = c( "DirSubCopy", - "FileSchmData" + "FileSchmData01", + "FileSchmData30" ), log = log ) @@ -148,15 +149,20 @@ log$debug(base::paste0('Input directory: ', Para$DirIn)) log$debug(base::paste0('Output directory: ', Para$DirOut)) log$debug(base::paste0('Error directory: ', Para$DirErr)) -# Retrieve output schema for stats -FileSchmData <- Para$FileSchmData +# Retrieve output schemas for 1-min and 30-min stats +FileSchmData01 <- Para$FileSchmData01 +FileSchmData30 <- Para$FileSchmData30 -#one or more will always be null -# Read in the schema -if(base::is.null(FileSchmData) || FileSchmData == 'NA'){ - FileSchmData <- NULL +# Read in the schemas +if(base::is.null(FileSchmData01) || FileSchmData01 == 'NA'){ + FileSchmData01 <- NULL } else { - FileSchmData <- base::paste0(base::readLines(FileSchmData),collapse='') + FileSchmData01 <- base::paste0(base::readLines(FileSchmData01),collapse='') +} +if(base::is.null(FileSchmData30) || FileSchmData30 == 'NA'){ + FileSchmData30 <- NULL +} else { + FileSchmData30 <- base::paste0(base::readLines(FileSchmData30),collapse='') } # Retrieve optional subdirectories to copy over @@ -189,7 +195,8 @@ foreach::foreach(idxDirIn = DirIn) %dopar% { withCallingHandlers( wrap.precip.pluvio.stats(DirIn=idxDirIn, DirOutBase=Para$DirOut, - SchmData = FileSchmData, + SchmData01 = FileSchmData01, + SchmData30 = FileSchmData30, DirSubCopy=DirSubCopy, log=log ), diff --git a/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R b/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R index 45f38d02d..bf4affae1 100644 --- a/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R +++ b/flow/flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R @@ -66,10 +66,13 @@ # changelog and author contributions / copyrights # Teresa Burlingame (2025-07-21) # Initial creation +# Teresa Burlingame (2025-04-08) +# change sum logic to be NA when all 30 mins are NA. ############################################################################################## wrap.precip.pluvio.stats <- function(DirIn, DirOutBase, - SchmData = NULL, #new schema with all data + SchmData01 = NULL, #schema for 1-minute output + SchmData30 = NULL, #schema for 30-minute output (includes insuffDataQF) DirSubCopy = NULL, log = NULL) { @@ -162,6 +165,20 @@ wrap.precip.pluvio.stats <- function(DirIn, data <- merge(data, qfCal, by = 'readout_time', all = TRUE) data <- merge(data, qfPlau, by = 'readout_time', all = TRUE) + # If validCalQF or suspectCalQF are missing/NA, set to -1 + if (!('validCalQF' %in% names(data)) || all(is.na(data$validCalQF))) { + data[, validCalQF := -1L] + log$warn('validCalQF not found or all NA — setting to -1') + } else { + data[is.na(validCalQF), validCalQF := -1L] + } + if (!('suspectCalQF' %in% names(data)) || all(is.na(data$suspectCalQF))) { + data[, suspectCalQF := -1L] + log$warn('suspectCalQF not found or all NA — setting to -1') + } else { + data[is.na(suspectCalQF), suspectCalQF := -1L] + } + # Read uncertainty coefficients fileUcrt <- base::dir(dir_paths$uncertainty_coef) @@ -263,7 +280,7 @@ wrap.precip.pluvio.stats <- function(DirIn, )] # Calculate finalQF for 1-minute data - stats_01min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, na.rm = TRUE)] + stats_01min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, suspectCalQF, na.rm = TRUE)] # Create 30-minute time groups stats_01min[, time_group := floor_date(startDateTime, "30 mins")] @@ -272,8 +289,8 @@ wrap.precip.pluvio.stats <- function(DirIn, stats_30min <- stats_01min[, .( startDateTime = min(startDateTime), endDateTime = max(endDateTime), - precipBulk = sum(precipBulk, na.rm = TRUE), - precipBulkExpUncert = sqrt(sum(precipBulkExpUncert^2, na.rm = TRUE)) * 2, # Quadrature sum with 2x multiplier + precipBulk = ifelse(all(is.na(precipBulk)), NA_real_, sum(precipBulk, na.rm = TRUE)), + precipBulkExpUncert = ifelse(all(is.na(precipBulkExpUncert)), NA_real_, sqrt(sum(precipBulkExpUncert^2, na.rm = TRUE)) * 2), # Quadrature sum with 2x multiplier precipNumPts = sum(precipNumPts, na.rm = TRUE), # Sum the counts from 1-minute intervals nullQF = as.integer(ifelse(mean(nullQF == 1, na.rm = TRUE) >= 0.1, 1L, ifelse(all(is.na(nullQF)), NA_integer_, min(nullQF, na.rm = TRUE)))), @@ -292,40 +309,47 @@ wrap.precip.pluvio.stats <- function(DirIn, ifelse(all(is.na(heaterErrorQF)), NA_integer_, min(heaterErrorQF, na.rm = TRUE)))) ), by = time_group] - # Update finalQF based on the aggregated flags - stats_30min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, validCalQF, suspectCalQF, na.rm = TRUE)] + # Compute insuffDataQF for 30-minute data: + # Flag when null QF has not triggered but data is incomplete AND measurable precip was recorded. + # insuffDataQF = 1: nullQF=0 (pass), precipNumPts < 30, and precipBulk > 0 (potential missed precip) + # insuffDataQF = 0: full data (precipNumPts >= 30), or no measurable precip + # insuffDataQF = -1: null flag already triggered (redundant to flag insufficiency separately) + stats_30min[, insuffDataQF := as.integer( + fifelse(nullQF == 1L, -1L, + fifelse(precipNumPts >= 30L, 0L, + fifelse(is.na(precipBulk) | precipBulk <= 0, 0L, 1L))) + )] + + # Update finalQF based on the aggregated flags, including insuffDataQF + stats_30min[, finalQF := pmax(nullQF, extremePrecipQF, gapQF, sensorErrorQF, heaterErrorQF, suspectCalQF, insuffDataQF, na.rm = TRUE)] # Clean up temporary columns stats_01min[, time_group := NULL] stats_30min[, time_group := NULL] - # Clean up - stats_01min[, startDateTime_1min := NULL] - stats_30min[, startDateTime_1min := NULL] - - # Clean up - stats_01min[, endDateTime_1min := NULL] - stats_30min[, endDateTime_1min := NULL] - - # Reorder columns to match schema requirements - col_order <- c('startDateTime', 'endDateTime', 'precipBulk', 'precipBulkExpUncert', 'precipNumPts', - 'nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF', - 'sensorErrorQF', 'validCalQF', 'suspectCalQF', 'finalQF') - stats_aggr01 <- stats_01min[, ..col_order] - stats_aggr30 <- stats_30min[, ..col_order] + # Reorder columns to match schema requirements + col_order_01 <- c('startDateTime', 'endDateTime', 'precipBulk', 'precipBulkExpUncert', 'precipNumPts', + 'nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF', + 'sensorErrorQF', 'validCalQF', 'suspectCalQF', 'finalQF') + col_order_30 <- c('startDateTime', 'endDateTime', 'precipBulk', 'precipBulkExpUncert', 'precipNumPts', + 'insuffDataQF', 'nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF', + 'sensorErrorQF', 'validCalQF', 'suspectCalQF', 'finalQF') + + stats_aggr01 <- stats_01min[, ..col_order_01] + stats_aggr30 <- stats_30min[, ..col_order_30] # Convert back to data.frame if needed for writing setDF(stats_aggr01) setDF(stats_aggr30) # Write output files - write_output_files(stats_aggr01, stats_aggr30, files$data, dirOutStat, SchmData, log) + write_output_files(stats_aggr01, stats_aggr30, files$data, dirOutStat, SchmData01, SchmData30, log) return() } # Helper function for file writing -write_output_files <- function(stats_01, stats_30, fileData, dirOutStat, SchmData, log) { +write_output_files <- function(stats_01, stats_30, fileData, dirOutStat, SchmData01, SchmData30, log) { if (is.na(fileData)) return() # Create output filenames @@ -340,7 +364,8 @@ write_output_files <- function(stats_01, stats_30, fileData, dirOutStat, SchmDat file_paths <- fs::path(dirOutStat, file_names) datasets <- list(stats_01, stats_30) - + schm_list <- list(SchmData01, SchmData30) + # Write files for (i in seq_along(file_paths)) { rptWrte <- base::try( @@ -348,7 +373,7 @@ write_output_files <- function(stats_01, stats_30, fileData, dirOutStat, SchmDat data = datasets[[i]], NameFile = file_paths[i], NameFileSchm = NULL, - Schm = SchmData, + Schm = schm_list[[i]], log = log ), silent = TRUE diff --git a/flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet b/flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet new file mode 100644 index 000000000..2248d37be Binary files /dev/null and b/flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet differ diff --git a/flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet b/flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet new file mode 100644 index 000000000..1d8969938 Binary files /dev/null and b/flow/tests/testthat/pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet differ diff --git a/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet b/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet new file mode 100644 index 000000000..8c96f9b9c Binary files /dev/null and b/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/data/pluvio_CFGLOC105245_2025-03-31.parquet differ diff --git a/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsCal.parquet b/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsCal.parquet new file mode 100644 index 000000000..b00e2aaf8 Binary files /dev/null and b/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsCal.parquet differ diff --git a/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet b/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet new file mode 100644 index 000000000..09e08e748 Binary files /dev/null and b/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/flags/pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet differ diff --git a/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/uncertainty_coef/pluvio_CFGLOC105245_2025-03-31_uncertaintyCoef.json b/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/uncertainty_coef/pluvio_CFGLOC105245_2025-03-31_uncertaintyCoef.json new file mode 100644 index 000000000..4b215accf --- /dev/null +++ b/flow/tests/testthat/pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/uncertainty_coef/pluvio_CFGLOC105245_2025-03-31_uncertaintyCoef.json @@ -0,0 +1,8 @@ +[ + { + "Name": "U_CVALA1", + "Value": "0.02", + "start_date": "2024-01-01T00:00:00Z", + "end_date": "2026-12-31T23:59:59Z" + } +] \ No newline at end of file diff --git a/flow/tests/testthat/test-wrap-precip-pluvio-flags.R b/flow/tests/testthat/test-wrap-precip-pluvio-flags.R new file mode 100644 index 000000000..e76eb66af --- /dev/null +++ b/flow/tests/testthat/test-wrap-precip-pluvio-flags.R @@ -0,0 +1,181 @@ +############################################################################################## +#' @title Unit tests for wrap.precip.pluvio.flags + +#' @author +#' Teresa Burlingame \email{tburlingame@battelleecology.org} + +#' @description +#' Run unit tests for wrap.precip.pluvio.flags.R. +#' The tests include positive and negative scenarios. +#' Positive tests verify that the output flags directory is created, that the +#' flagsPlausibility output file contains heaterErrorQF and sensorErrorQF columns, +#' and that the bitwise sensor-status and inlet-temperature logic produces the +#' expected flag values. +#' The negative test verifies that an invalid input directory produces an error. +#' +#' Fixture data expected at: +#' pfs/precipPluvioFlags/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/ +#' data/ - pluvio_CFGLOC105245_2025-03-31.parquet +#' flags/ - pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet +#' +#' Row key for fixture data: +#' Row 1: sensorStatus=0, inletTemp=5.0 -> sensorErrorQF=0, heaterErrorQF=0 +#' Row 2: sensorStatus=NA, inletTemp=NA -> sensorErrorQF=-1, heaterErrorQF=-1 +#' Row 3: sensorStatus=64, inletTemp=-2.0 -> sensorErrorQF=1, heaterErrorQF=1 +#' Row 4: sensorStatus=128, inletTemp=5.0 -> sensorErrorQF=1, heaterErrorQF=0 +#' Row 5: sensorStatus=256, inletTemp=5.0 -> sensorErrorQF=1, heaterErrorQF=0 +#' Row 6: sensorStatus=512, inletTemp=5.0 -> sensorErrorQF=1, heaterErrorQF=0 +#' Row 7: sensorStatus=1024, inletTemp=5.0 -> sensorErrorQF=1, heaterErrorQF=0 +#' Row 8: sensorStatus=0, inletTemp=-2.0 -> sensorErrorQF=0, heaterErrorQF=1 +#' Row 9: sensorStatus=0, inletTemp=5.0 -> sensorErrorQF=0, heaterErrorQF=0 +#' Row 10: sensorStatus=0, inletTemp=5.0 -> sensorErrorQF=0, heaterErrorQF=0 + +# changelog and author contributions / copyrights +# Teresa Burlingame (2026-05-08) +# Initial creation +############################################################################################## +library(testthat) + +context("\n | Unit test of wrap.precip.pluvio.flags for NEON IS data processing \n") + +test_that("Unit test of wrap.precip.pluvio.flags.R", { + source('../../flow.precip.pluvio.flags/wrap.precip.pluvio.flags.R') + + DirFixture <- file.path(getwd(), 'pfs/precipPluvioFlags') + DirOutBase <- tempfile(pattern = 'precip_pluvio_flags_test_out') + + # Clean up temp output after test completes + on.exit(if (dir.exists(DirOutBase)) unlink(DirOutBase, recursive = TRUE), add = TRUE) + + DirIn <- file.path(DirFixture, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245') + + # -------------------------------------------------------------------------------------------- + # Test 1: Happy path - output flags directory is created + # -------------------------------------------------------------------------------------------- + wrap.precip.pluvio.flags( + DirIn = DirIn, + DirOutBase = DirOutBase + ) + + DirOut <- file.path(DirOutBase, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245') + testthat::expect_true(dir.exists(file.path(DirOut, 'flags'))) + + # -------------------------------------------------------------------------------------------- + # Test 2: Output flagsPlausibility file is written + # -------------------------------------------------------------------------------------------- + out_flags <- list.files(file.path(DirOut, 'flags'), pattern = 'flagsPlausibility\\.parquet$', + full.names = TRUE) + testthat::expect_equal(length(out_flags), 1) + + # -------------------------------------------------------------------------------------------- + # Test 3: Output file contains heaterErrorQF and sensorErrorQF columns + # -------------------------------------------------------------------------------------------- + qfOut <- NEONprocIS.base::def.read.parq.ds( + fileIn = out_flags, + VarTime = 'readout_time', + RmvDupl = TRUE, + Df = TRUE + ) + + testthat::expect_true('heaterErrorQF' %in% names(qfOut)) + testthat::expect_true('sensorErrorQF' %in% names(qfOut)) + + # -------------------------------------------------------------------------------------------- + # Test 4: sensorErrorQF is -1 when sensorStatus is NA (row 2) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[2], -1L) + + # -------------------------------------------------------------------------------------------- + # Test 5: sensorErrorQF is 0 when sensorStatus is 0 (row 1) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[1], 0L) + + # -------------------------------------------------------------------------------------------- + # Test 6: sensorErrorQF is 1 when bit 6 (unstable) is set (row 3, sensorStatus=64) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[3], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 7: sensorErrorQF is 1 when bit 7 (defective) is set (row 4, sensorStatus=128) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[4], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 8: sensorErrorQF is 1 when bit 8 (weight < min) is set (row 5, sensorStatus=256) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[5], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 9: sensorErrorQF is 1 when bit 9 (weight > max) is set (row 6, sensorStatus=512) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[6], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 10: sensorErrorQF is 1 when bit 10 (no calibration) is set (row 7, sensorStatus=1024) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$sensorErrorQF[7], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 11: heaterErrorQF is -1 when inletTemp is NA (row 2) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$heaterErrorQF[2], -1L) + + # -------------------------------------------------------------------------------------------- + # Test 12: heaterErrorQF is 0 when inletTemp > 0 (row 1, inletTemp=5.0) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$heaterErrorQF[1], 0L) + + # -------------------------------------------------------------------------------------------- + # Test 13: heaterErrorQF is 1 when inletTemp < 0 and cell is near freezing (row 3, inletTemp=-2) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$heaterErrorQF[3], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 14: heaterErrorQF is 1 when sensorStatus=0 but inletTemp<0 (row 8, inletTemp=-2) + # -------------------------------------------------------------------------------------------- + testthat::expect_equal(qfOut$heaterErrorQF[8], 1L) + + # -------------------------------------------------------------------------------------------- + # Test 15: Original flag columns are preserved in output (nullQF, rangeQF, gapQF) + # -------------------------------------------------------------------------------------------- + testthat::expect_true('nullQF' %in% names(qfOut)) + testthat::expect_true('rangeQF' %in% names(qfOut)) + testthat::expect_true('gapQF' %in% names(qfOut)) + + # -------------------------------------------------------------------------------------------- + # Test 16: All flag values are in the expected set {-1, 0, 1} + # -------------------------------------------------------------------------------------------- + testthat::expect_true(all(qfOut$sensorErrorQF %in% c(-1L, 0L, 1L))) + testthat::expect_true(all(qfOut$heaterErrorQF %in% c(-1L, 0L, 1L))) + + # -------------------------------------------------------------------------------------------- + # Test 17: DirSubCopy carries additional directories through to output + # -------------------------------------------------------------------------------------------- + DirOutBase2 <- tempfile(pattern = 'precip_pluvio_flags_test_out2') + on.exit(if (dir.exists(DirOutBase2)) unlink(DirOutBase2, recursive = TRUE), add = TRUE) + + wrap.precip.pluvio.flags( + DirIn = DirIn, + DirOutBase = DirOutBase2, + DirSubCopy = 'data' + ) + + DirOut2 <- file.path(DirOutBase2, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245') + testthat::expect_true(dir.exists(file.path(DirOut2, 'flags'))) + testthat::expect_true(dir.exists(file.path(DirOut2, 'data'))) + + # -------------------------------------------------------------------------------------------- + # Test 18: Invalid input directory produces an error + # -------------------------------------------------------------------------------------------- + DirIn_bad <- file.path(DirFixture, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC_DOES_NOT_EXIST') + + rpt <- try( + wrap.precip.pluvio.flags( + DirIn = DirIn_bad, + DirOutBase = DirOutBase + ), + silent = TRUE + ) + + testthat::expect_true('try-error' %in% class(rpt)) +}) diff --git a/flow/tests/testthat/test-wrap-precip-pluvio-stats.R b/flow/tests/testthat/test-wrap-precip-pluvio-stats.R new file mode 100644 index 000000000..fc6367612 --- /dev/null +++ b/flow/tests/testthat/test-wrap-precip-pluvio-stats.R @@ -0,0 +1,199 @@ +############################################################################################## +#' @title Unit tests for wrap.precip.pluvio.stats + +#' @author +#' Teresa Burlingame \email{tburlingame@battelleecology.org} + +#' @description +#' Run unit tests for wrap.precip.pluvio.stats.R. +#' The tests include positive and negative scenarios. +#' Positive tests verify that 1-minute and 30-minute output files are created with the +#' expected columns, that uncertainty is applied correctly, that 30-minute aggregation +#' sums precipitation across 1-minute intervals, and that finalQF reflects the component flags. +#' The negative test verifies that an invalid input directory produces an error. +#' +#' Fixture data expected at: +#' pfs/precipPluvioStats/2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245/ +#' data/ - pluvio_CFGLOC105245_2025-03-31.parquet (60 rows, 1 hr) +#' flags/ - pluvio_CFGLOC105245_2025-03-31_flagsCal.parquet +#' (validCalQF, suspectCalQF) +#' pluvio_CFGLOC105245_2025-03-31_flagsPlausibility.parquet +#' (nullQF, rangeQF, gapQF, sensorErrorQF, heaterErrorQF) +#' uncertainty_coef/ - pluvio_CFGLOC105245_2025-03-31_uncertaintyCoef.json +#' (U_CVALA1 = 0.02) + +# changelog and author contributions / copyrights +# Teresa Burlingame (2026-05-08) +# Initial creation +############################################################################################## +library(testthat) + +context("\n | Unit test of wrap.precip.pluvio.stats for NEON IS data processing \n") + +test_that("Unit test of wrap.precip.pluvio.stats.R", { + source('../../flow.precip.pluvio.stats/wrap.precip.pluvio.stats.R') + + DirFixture <- file.path(getwd(), 'pfs/precipPluvioStats') + DirOutBase <- tempfile(pattern = 'precip_pluvio_stats_test_out') + + # Clean up temp output after test completes + on.exit(if (dir.exists(DirOutBase)) unlink(DirOutBase, recursive = TRUE), add = TRUE) + + DirIn <- file.path(DirFixture, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245') + + # -------------------------------------------------------------------------------------------- + # Test 1: Happy path - output stats directory is created + # -------------------------------------------------------------------------------------------- + wrap.precip.pluvio.stats( + DirIn = DirIn, + DirOutBase = DirOutBase, + DirSubCopy = 'location' + ) + + DirOut <- file.path(DirOutBase, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245') + testthat::expect_true(dir.exists(file.path(DirOut, 'stats'))) + + # -------------------------------------------------------------------------------------------- + # Test 2: Both 1-min and 30-min output files are written + # -------------------------------------------------------------------------------------------- + stats_files <- list.files( + file.path(DirOut, 'stats'), + pattern = '_stats_(001|030)\\.parquet$', + full.names = TRUE + ) + + testthat::expect_equal(length(stats_files), 2) + + file_001 <- stats_files[grepl('_stats_001\\.parquet$', stats_files)] + file_030 <- stats_files[grepl('_stats_030\\.parquet$', stats_files)] + testthat::expect_equal(length(file_001), 1) + testthat::expect_equal(length(file_030), 1) + + # -------------------------------------------------------------------------------------------- + # Test 3: 1-min output schema contains all expected columns + # -------------------------------------------------------------------------------------------- + stats_001 <- NEONprocIS.base::def.read.parq.ds( + fileIn = file_001, + VarTime = 'startDateTime', + RmvDupl = TRUE, + Df = TRUE + ) + + expected_cols <- c( + 'startDateTime', 'endDateTime', + 'precipBulk', 'precipBulkExpUncert', 'precipNumPts', + 'nullQF', 'gapQF', 'extremePrecipQF', + 'heaterErrorQF', 'sensorErrorQF', + 'validCalQF', 'suspectCalQF', 'finalQF' + ) + + testthat::expect_true(all(expected_cols %in% names(stats_001))) + + # -------------------------------------------------------------------------------------------- + # Test 4: 30-min output schema contains all expected columns including insuffDataQF + # -------------------------------------------------------------------------------------------- + stats_030 <- NEONprocIS.base::def.read.parq.ds( + fileIn = file_030, + VarTime = 'startDateTime', + RmvDupl = TRUE, + Df = TRUE + ) + + expected_cols_030 <- c(expected_cols[1:5], 'insuffDataQF', expected_cols[6:length(expected_cols)]) + testthat::expect_true(all(expected_cols_030 %in% names(stats_030))) + + # insuffDataQF must not appear in 1-min output + testthat::expect_false('insuffDataQF' %in% names(stats_001)) + + # -------------------------------------------------------------------------------------------- + # Test 5: 30-min output has fewer rows than 1-min output + # -------------------------------------------------------------------------------------------- + testthat::expect_lt(nrow(stats_030), nrow(stats_001)) + + # -------------------------------------------------------------------------------------------- + # Test 6: precipBulk values are non-negative + # -------------------------------------------------------------------------------------------- + testthat::expect_true(all(stats_001$precipBulk >= 0, na.rm = TRUE)) + testthat::expect_true(all(stats_030$precipBulk >= 0, na.rm = TRUE)) + + # -------------------------------------------------------------------------------------------- + # Test 7: precipBulkExpUncert meets the minimum 0.1 mm manufacturer accuracy spec + # -------------------------------------------------------------------------------------------- + testthat::expect_true(all(stats_001$precipBulkExpUncert >= 0.1, na.rm = TRUE)) + + # -------------------------------------------------------------------------------------------- + # Test 8: 30-min precipBulk is the sum of the corresponding 1-min precipBulk values + # -------------------------------------------------------------------------------------------- + # First 30-min interval covers rows 1-30 of 1-min stats + first_30_sum <- sum(stats_001$precipBulk[1:30], na.rm = TRUE) + testthat::expect_equal(stats_030$precipBulk[1], first_30_sum, tolerance = 1e-8) + + # -------------------------------------------------------------------------------------------- + # Test 9: finalQF is 0 when all component flags are 0 (fixture has no errors) + # -------------------------------------------------------------------------------------------- + testthat::expect_true(all(stats_001$finalQF == 0L, na.rm = TRUE)) + testthat::expect_true(all(stats_030$finalQF == 0L, na.rm = TRUE)) + + # -------------------------------------------------------------------------------------------- + # Test 10: Quality flag values are within the expected set {-1, 0, 1} + # -------------------------------------------------------------------------------------------- + qf_cols_001 <- c('nullQF', 'gapQF', 'extremePrecipQF', 'heaterErrorQF', + 'sensorErrorQF', 'finalQF') + qf_cols_030 <- c(qf_cols_001, 'insuffDataQF') + + for (col in qf_cols_001) { + testthat::expect_true( + all(stats_001[[col]] %in% c(-1L, 0L, 1L), na.rm = TRUE), + info = paste0('Column ', col, ' in 1-min output has unexpected values') + ) + } + for (col in qf_cols_030) { + testthat::expect_true( + all(stats_030[[col]] %in% c(-1L, 0L, 1L), na.rm = TRUE), + info = paste0('Column ', col, ' in 30-min output has unexpected values') + ) + } + + # -------------------------------------------------------------------------------------------- + # Test 10b: insuffDataQF is 0 when precipNumPts == 30 (fixture has complete data) + # -------------------------------------------------------------------------------------------- + testthat::expect_true(all(stats_030$insuffDataQF == 0L, na.rm = TRUE)) + + # -------------------------------------------------------------------------------------------- + # Test 11: precipNumPts equals 1 for each non-NA 1-min row and sums to 30 per 30-min interval + # -------------------------------------------------------------------------------------------- + testthat::expect_true(all(stats_001$precipNumPts %in% c(0L, 1L))) + testthat::expect_equal(stats_030$precipNumPts[1], 30L) + + # -------------------------------------------------------------------------------------------- + # Test 12: DirSubCopy carries additional directories through to output + # -------------------------------------------------------------------------------------------- + DirOutBase2 <- tempfile(pattern = 'precip_pluvio_stats_test_out2') + on.exit(if (dir.exists(DirOutBase2)) unlink(DirOutBase2, recursive = TRUE), add = TRUE) + + wrap.precip.pluvio.stats( + DirIn = DirIn, + DirOutBase = DirOutBase2, + DirSubCopy = c('flags', 'uncertainty_coef') + ) + + DirOut2 <- file.path(DirOutBase2, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC105245') + testthat::expect_true(dir.exists(file.path(DirOut2, 'stats'))) + testthat::expect_true(dir.exists(file.path(DirOut2, 'flags'))) + testthat::expect_true(dir.exists(file.path(DirOut2, 'uncertainty_coef'))) + + # -------------------------------------------------------------------------------------------- + # Test 13: Invalid input directory produces an error + # -------------------------------------------------------------------------------------------- + DirIn_bad <- file.path(DirFixture, '2025/03/31/precip-weighing_SITE001/pluvio/CFGLOC_DOES_NOT_EXIST') + + rpt <- try( + wrap.precip.pluvio.stats( + DirIn = DirIn_bad, + DirOutBase = DirOutBase + ), + silent = TRUE + ) + + testthat::expect_true('try-error' %in% class(rpt)) +}) diff --git a/pipe/pluvio/pluvio_calibration_assignment.yaml b/pipe/pluvio/pluvio_calibration_assignment.yaml index 8a09f0a62..1cc93e5c3 100644 --- a/pipe/pluvio/pluvio_calibration_assignment.yaml +++ b/pipe/pluvio/pluvio_calibration_assignment.yaml @@ -18,7 +18,7 @@ input: cross: - pfs: name: DIR_IN - repo: pluvio_calibration_loader_test_files + repo: pluvio_calibration_loader glob: /*/* - pfs: name: FILE_YEAR diff --git a/pipe/pluvio/pluvio_calibration_loader.yaml b/pipe/pluvio/pluvio_calibration_loader.yaml index 01631eb2d..1dc1e866b 100644 --- a/pipe/pluvio/pluvio_calibration_loader.yaml +++ b/pipe/pluvio/pluvio_calibration_loader.yaml @@ -4,31 +4,16 @@ pipeline: transform: image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-cval-loader:v3.0.0 cmd: - - sh - - "-c" - - |- - /bin/bash <<'EOF' - # Use bash-scrict mode. See http://redsymbol.net/articles/unofficial-bash-sstrict-mode/ - set -euo pipefail - IFS=$'\n\t' - - # Refresh interim directories with each datum (otherwise they persist and cause probs) - rm -rf $OUT_PATH - mkdir -p $OUT_PATH # R modules must have pfs in the repo structure - - python3 -m calval_loader.load_all_calval_files #run the calibration loader - - if [[ -d "$OUT_PATH/$SOURCE_TYPE" ]]; then - cp -r $OUT_PATH/$SOURCE_TYPE /pfs/out/$SOURCE_TYPE_OUT - fi - - EOF + - /bin/bash + stdin: + - '#!/bin/bash' + - python3 -m calval_loader.load_all_calval_files env: CVAL_INGEST_BUCKET: neon-cval - OUT_PATH: /tmp/out + OUT_PATH: /pfs/out LOG_LEVEL: INFO - SOURCE_TYPE: "pluvio_raw" - SOURCE_TYPE_OUT: "pluvio" + SOURCE_TYPE: "pluvio" + SCHEMA_NAME: "pluvio_raw" STARTING_PATH_INDEX: "5" secrets: - name: pdr-secret diff --git a/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml b/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml index af1e430bd..043204f1d 100644 --- a/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml +++ b/pipe/pluvio/pluvio_cron_daily_and_date_control.yaml @@ -10,8 +10,8 @@ transform: # NOTE that the pluvio has different site-dates for kafka and trino loaders. # This is because some kafka topics start with dfir instead of simply , which the trino loader cannot handle LAG_DAYS_END: "2" # Default is 2. Don't go lower than 2. - START_DATE: "2026-02-22" - #END_DATE: "2024-06-01" + START_DATE: "2026-02-27" + END_DATE: "2026-04-08" OUT_PATH_KAFKA: /pfs/out/kafka OUT_PATH_TRINO: /pfs/out/trino SOURCE_TYPE: "pluvio_raw" @@ -41,7 +41,8 @@ input: # This cron is the central driver for daily scheduled updates, such as data ingest and metadata loaders. - cron: name: tick - spec: "0 7 * * *" # Run at 00:00 MST (07:00 GMT) + #spec: "0 7 * * *" # Run at 00:00 MST (07:00 GMT) + spec: "@never" overwrite: true - pfs: name: SITE_FILE_KAFKA diff --git a/pipe/precipWeighingv2/pipe_list_precipWeighingv2.txt b/pipe/precipWeighingv2/pipe_list_precipWeighingv2.txt index 5e5ffb148..43a827124 100644 --- a/pipe/precipWeighingv2/pipe_list_precipWeighingv2.txt +++ b/pipe/precipWeighingv2/pipe_list_precipWeighingv2.txt @@ -7,4 +7,8 @@ precipWeighingv2_threshold.yaml precipWeighingv2_thresh_select_ts_pad.yaml precipWeighingv2_analyze_pad_and_qaqc.yaml precipWeighingv2_qm_stats_group_and_compute.yaml - +precipWeighingv2_level1_group_consolidate_srf.yaml +precipWeighingv2_cron_monthly_and_pub_control.yaml +precipWeighingv2_pub_group.yaml +precipWeighingv2_pub_format_and_package.yaml +precipWeighingv2_pub_egress_and_publish.yaml \ No newline at end of file diff --git a/pipe/precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml b/pipe/precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml index a5297dc6a..f7aec9f15 100644 --- a/pipe/precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml +++ b/pipe/precipWeighingv2/precipWeighingv2_analyze_pad_and_qaqc.yaml @@ -4,7 +4,7 @@ pipeline: transform: image_pull_secrets: - battelleecology-quay-read-all-pull-secret - image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pluvio-flgs:v0.0.3 + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pluvio-flgs:sha-c5d15db cmd: - sh - "-c" @@ -34,7 +34,6 @@ transform: DirIn=/tmp/pfs/padded_analyzerCopy \ DirOut=/tmp/pfs/plau \ DirErr=/pfs/out/errored_datums \ - FileSchmQf=$SCHEMA_FLAGS \ "TermTest1=accu_nrt:null|gap|range(rmv)" # Run third module - custom flags @@ -60,10 +59,6 @@ input: name: DATA_PATH repo: precipWeighingv2_thresh_select_ts_pad glob: /*/*/* - - pfs: - name: SCHEMA_FLAGS - repo: precipWeighingv2_avro_schemas - glob: /precipWeighingv2/flags_plausibility_precipWeighingv2.avsc - pfs: name: SCHEMA_FLAGS_CUST repo: precipWeighingv2_avro_schemas diff --git a/pipe/precipWeighingv2/precipWeighingv2_cron_monthly_and_pub_control.yaml b/pipe/precipWeighingv2/precipWeighingv2_cron_monthly_and_pub_control.yaml new file mode 100644 index 000000000..db4ddde9c --- /dev/null +++ b/pipe/precipWeighingv2/precipWeighingv2_cron_monthly_and_pub_control.yaml @@ -0,0 +1,36 @@ +--- +pipeline: + name: precipWeighingv2_cron_monthly_and_pub_control +transform: + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pub-cntl:v1.1.0 + cmd: ["/bin/bash"] + env: + # START_MONTH and END_MONTH indicate the date range (inclusive) to create the /Y/M folder structure + # START_DATE must be set, format "YYYY-MM" + # END_DATE can be set or unset (comment or remove line to unset). If unset, end month will be last month. + OUT_PATH: /pfs/out + START_MONTH: "2026-03" + END_MONTH: "2026-03" # Inclusive. Run the pipeline with END_MONTH set to initialize, then comment out and update pipeline (no reprocess) to let the cron take over + stdin: + - "#!/bin/bash" + - ./cron_monthly_and_pub_control/populate_pub_months.sh +input: + # Choose a monthly cron date to be something sufficiently after the 1st to allow kafka lag and timeseries pad + cron: + name: tick + spec: "@never" + #spec: "0 7 9 * *" # Run at 00:00 MST (07:00 GMT) on the 9th of the month + overwrite: true +autoscaling: true +resource_requests: + memory: 64M + cpu: 0.1 +resource_limits: + memory: 200M + cpu: 1 +sidecar_resource_requests: + memory: 200M + cpu: 0.1 +scheduling_spec: + node_selector: + cloud.google.com/compute-class: pach-pipeline-class \ No newline at end of file diff --git a/pipe/precipWeighingv2/precipWeighingv2_level1_group_consolidate_srf.yaml b/pipe/precipWeighingv2/precipWeighingv2_level1_group_consolidate_srf.yaml index 78179bda4..c080a06f0 100644 --- a/pipe/precipWeighingv2/precipWeighingv2_level1_group_consolidate_srf.yaml +++ b/pipe/precipWeighingv2/precipWeighingv2_level1_group_consolidate_srf.yaml @@ -170,7 +170,7 @@ transform: PARALLELIZATION_INTERNAL: '2' # Environment variables for the L1 archiver - GROUP_PREFIX: par-quantum-line # no ending "_" + GROUP_PREFIX: precip-weighing-v2 # no ending "_" secrets: - name: pdr-secret @@ -183,7 +183,7 @@ input: join: - pfs: name: STATISTICS_PATH - repo: precipWeighingv2_stats_group_and_compute + repo: precipWeighingv2_qm_stats_group_and_compute glob: /(*/*/*) joinOn: $1 outer_join: true # Need outer join to pull in with or without SRFs diff --git a/pipe/precipWeighingv2/precipWeighingv2_pub_egress_and_publish.yaml b/pipe/precipWeighingv2/precipWeighingv2_pub_egress_and_publish.yaml new file mode 100644 index 000000000..397dabe10 --- /dev/null +++ b/pipe/precipWeighingv2/precipWeighingv2_pub_egress_and_publish.yaml @@ -0,0 +1,173 @@ +--- +pipeline: + name: precipWeighingv2_pub_egress_and_publish +transform: + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pub-egrs-publ:v5.0.0 + cmd: + - sh + - "-c" + - |- + /bin/bash <<'EOF' + # Use bash-scrict mode. See http://redsymbol.net/articles/unofficial-bash-strict-mode/ + set -euo pipefail + IFS=$'\n\t' + + curl -o $OUT_MDP_SITES https://raw.githubusercontent.com/NEONScience/NEON-IS-data-processing-inputs/refs/heads/main/mdp_sites_list.txt + + # Run first module - pub_egress (using environment variables below as input parameters) + if [[ $(echo $DATA_PATH) ]]; then + python3 -m pub_egress.pub_egress_main + fi + # If there is output, egress it + if ls $OUT_PATH/NEON.DOM.SITE* 1> /dev/null 2>&1; then + for DIR in $OUT_PATH/NEON.DOM.SITE*; do + echo "Starting non-MDP sites==================" + echo "Syncing $DIR to bucket $BUCKET_NAME" + # Parse the product + [[ "$DIR" =~ ^$OUT_PATH/(.*)$ ]] + PRODUCT="${BASH_REMATCH[1]}" + echo "PRODUCT is $PRODUCT" + rclone \ + --no-check-dest \ + --copy-links \ + --gcs-bucket-policy-only \ + --gcs-no-check-bucket \ + copy \ + "${OUT_PATH}/${PRODUCT}" \ + ":gcs://${BUCKET_NAME}/${PRODUCT}" + done + echo "============ Done for non-MDP sites" + else + echo "No pub output to egress" + fi + + # + # Do the same for MDP sites if mdp sites exists in the output + # Check to see if the output need to be sent to the staging or not + # For example, BUCKET_NAME_MDP: neon-aa-dev-md03-staging/Publication for staging SITE=MD03 + # Read mdp_site_list from githubusercontent + # + if ls $OUT_PATH_MDP/NEON.DOM.SITE* 1> /dev/null 2>&1; then + for DIR in $OUT_PATH_MDP/NEON.DOM.SITE*; do + echo "=" + echo "Starting MDP sites==================" + # Parse the product + [[ "$DIR" =~ ^$OUT_PATH_MDP/(.*)$ ]] + PRODUCT="${BASH_REMATCH[1]}" + echo "PRODUCT is $PRODUCT" + for DIR_SUB in $DIR/MD*; do + echo "DIR is $DIR" + echo "DIR_SUB is $DIR_SUB" + # Parse the site + [[ "$DIR_SUB" =~ ^$DIR/(.*)$ ]] + SITE="${BASH_REMATCH[1]}" + # to change to lowercase in case + # export site="${SITE,,}" + # + while read -r mdpsite prod staging bucket_name + do + if [[ $SITE == $mdpsite ]] && [[ $prod == $PROD ]] && [[ $staging == $STAGING ]]; then + BUCKET_NAME_MDP=$bucket_name + echo "$mdpsite products to $bucket_name bucket" + else echo "**** No products available for $mdpsite to $bucket_name bucket" + fi + done < $OUT_MDP_SITES + echo "Syncing $SITE products directory $DIR to mdp bucket $BUCKET_NAME_MDP" + done + rclone \ + --no-check-dest \ + --copy-links \ + --gcs-bucket-policy-only \ + --gcs-no-check-bucket \ + copy \ + "${OUT_PATH_MDP}/${PRODUCT}" \ + ":gcs://${BUCKET_NAME_MDP}/${PRODUCT}" + done + echo "============ Done for MDP sites" + #cp -f "$OUT_MDP_SITES" $OUT_PATH_MDP/mdp_sites.txt + else + echo "No MDP pub output to egress" + fi + + # Run second module - pub_upload (using environment variables below as input parameters) + echo "run pub uploader ..." + export DATA_PATH=$OUT_PATH + python3 -m pub_uploader.pub_uploader_main + # Run third module - pub_sync (using environment variables below as input parameters) + echo "run pub sync sites ..." + python3 -m pub_sync.pub_sync_main + EOF + env: + LOG_LEVEL: INFO + + # Environment variables for 1st module: pub_egress. The pub bucket and egress url are specified via secrets below. + OUT_PATH: "/pfs/out" + OUT_PATH_MDP: "/pfs/out/mdp" + OUT_MDP_SITES: "/tmp/mdp_sites.txt" + # ERR_PATH can be changed, it is user specified + ERR_PATH: /pfs/out/errored_datums + STARTING_PATH_INDEX: "2" # starting path index to process pub packages. Use "2" to process the whole repo with path structure /pfs/repo_name/... + PROD: "false" # false for non-prod, true for prod + STAGING: "true" # The default is true. + + # Environment variables for 2nd module: pub_upload. + # DATA_PATH is set in the code above to the output from the egress module + # Uses STARTING_PATH_INDEX above + VERSION: 'pachyderm_v1' + CHANGE_BY: pachyderm + + # Environment variables for 3rd module: pub_sync. + # Uses DATE_PATH from input spec. DATA_PATH is set in the code above to the output from the egress module + # Uses CHANGE_BY above + DATE_PATH_YEAR_INDEX: "3" + DATE_PATH_MONTH_INDEX: "4" + DATA_PATH_PRODUCT_INDEX: "3" + DATA_PATH_SITE_INDEX: "4" + DATA_PATH_DATE_INDEX: "5" + DATA_PATH_PACKAGE_INDEX: "6" + PRODUCTS: NEON.DOM.SITE.DP1.00044.002 # CAN BE MULTIPLE, COMMA-SEPARATED + SITES: "all" # CAN BE MULTIPLE, COMMA-SEPARATED array of NEON site codes. "all" will find all sites with pub records in the database. + + secrets: + - name: pdr-secret + mount_path: /var/db_secret + - name: pub-bucket + env_var: BUCKET_NAME + key: BUCKET_NAME + - name: pub-bucket + env_var: EGRESS_URL + key: EGRESS_URL + +input: + group: + - join: + - pfs: + name: DATA_PATH + repo: precipWeighingv2_pub_format_and_package + # Glob must be at each intended pub datum (i.e. each site/year/month), grouped by month + glob: /*/*/(*/*) + joinOn: $1 + group_by: $1 + - pfs: + name: DATE_PATH + repo: precipWeighingv2_cron_monthly_and_pub_control + glob: /(*/*) + joinOn: $1 + outer_join: True # We want to run even if no data so pub_sync runs + group_by: $1 + empty_files: true +autoscaling: true +resource_requests: + memory: 500M + cpu: .5 +resource_limits: + memory: 1G + cpu: 1.3 +sidecar_resource_requests: + memory: 3G + cpu: 1.3 +datum_set_spec: + number: 1 +scheduling_spec: + node_selector: + cloud.google.com/compute-class: pach-pipeline-class diff --git a/pipe/precipWeighingv2/precipWeighingv2_pub_format_and_package.yaml b/pipe/precipWeighingv2/precipWeighingv2_pub_format_and_package.yaml new file mode 100644 index 000000000..1d58b3df9 --- /dev/null +++ b/pipe/precipWeighingv2/precipWeighingv2_pub_format_and_package.yaml @@ -0,0 +1,123 @@ +--- +pipeline: + name: precipWeighingv2_pub_format_and_package +transform: + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pub-grp-pack:v4.2.0 + cmd: + - sh + - "-c" + - |- + /bin/bash <<'EOF' + # Use bash-scrict mode. See http://redsymbol.net/articles/unofficial-bash-strict-mode/ + set -euo pipefail + IFS=$'\n\t' + + # Refresh interim directories with each datum (otherwise they persist and cause probs) + rm -rf $OUT_PATH_TRANSFORMER + rm -rf $OUT_PATH_PACKAGER + mkdir $OUT_PATH_TRANSFORMER + mkdir $OUT_PATH_PACKAGER + + # Set some environment variables for the pub transformer module + export DATA_PATH=$GROUPED_PATH + export OUT_PATH=$OUT_PATH_TRANSFORMER + + # Run pub_workbook_loader to load pub workbooks for pub_transformer and os_table_loader. + python3 -m pub_workbook_loader.pub_workbook_loader_main + + # Run pub_transformer (using environment variables below as input parameters) + python3 -m pub_transformer.pub_transformer_main + + # Run pub_packager. Packager needs to be run at monthly glob. Get those paths. + export OUT_PATH=$OUT_PATH_PACKAGER + product_month_paths="${OUT_PATH_TRANSFORMER}/*/*/*" + for path in $product_month_paths; do + echo "Processing product-month path $path" + export DATA_PATH=$path + python3 -m pub_packager.pub_packager_main + done + + # Clean up after pub_transformer. + rm -rf $OUT_PATH_TRANSFORMER + + # Run pub_files. + export OUT_PATH=$OUT_PATH_PUBFILES + export IN_PATH=$OUT_PATH_PACKAGER + export LOCATION_PATH=$GROUPED_PATH + python3 -m pub_files.main + + EOF + env: + # Environment variables for 2nd (part A) module: pub_workbook_loader. + OUT_PATH_WORKBOOK: "/tmp/pub_workbooks" + PRODUCTS: NEON.DOM.SITE.DP1.00044.002 # Format: NEON.DOM.SITE.DPX.XXXXX.XXX,NEON.DOM.SITE.DPX.XXXXX.XXX,etc + + # Environment variables for 2nd module (part B): pub_transformer. + LOG_LEVEL: INFO + PRODUCT_INDEX: '3' # input path index of the data product identifier. Also shared with pub_packager. + YEAR_INDEX: '4' + MONTH_INDEX: '5' + DAY_INDEX: '7' + DATA_TYPE_INDEX: '8' + GROUP_METADATA_DIR: group + DATA_PATH_PARSE_INDEX: '2' + OUT_PATH_TRANSFORMER: "/tmp/pub_transformer" + WORKBOOK_PATH: "/tmp/pub_workbooks" + + # Environment variables for 3rd module: pub_packager. Also uses PRODUCT_INDEX from pub_transformer. + OUT_PATH_PACKAGER: "/tmp/pub_packager" + ERR_PATH_PACKAGER: "/pfs/out/packager/errored_datums" + PUBLOC_INDEX: '6' # input path index of the pub package location (typically the site) + DATE_INDEX: '4' # Starting index of date in path (i.e. year index) + DATE_INDEX_LENGTH: '2' # length of date index for pub package (should be 2 for monthly) + SORT_INDEX: '10' # File name index corresponding to date field (delimiter = .) + + # Environment variables for 3rd module: pub_files. + OUT_PATH_PUBFILES: "/pfs/out" + RELATIVE_PATH_INDEX: '3' + DB_SECRETS_PATH: /var/db_secret + GITHUB_PEM_PATH: /var/github_secret/key + GITHUB_APP_ID: '300002' + GITHUB_INSTALLATION_ID: '34765458' + GITHUB_HOST: https://api.github.com + GITHUB_REPO_OWNER: NEONScience + GITHUB_README_REPO: neon-metadata-docs + GITHUB_README_PATH: readme/template.j2 + GITHUB_EML_REPO: neon-metadata-docs + GITHUB_EML_BOILERPLATE_PATH: eml/neon_components/NEON_EML_Boilerplate.xml + GITHUB_EML_CONTACT_PATH: eml/neon_components/neon_contact.xml + GITHUB_EML_INTELLECTUAL_RIGHTS_PATH: eml/neon_components/neon_intellectualRights.xml + GITHUB_EML_UNIT_TYPES_PATH: eml/neon_components/neon_unitTypes.xml + GITHUB_EML_UNITS_PATH: eml/neon_components/NEON_units.txt + GITHUB_BRANCH: main + + secrets: + - name: pdr-secret + mount_path: /var/db_secret + - name: github-neonscience-app-secret + mount_path: /var/github_secret + +input: + pfs: + name: GROUPED_PATH + repo: precipWeighingv2_pub_group + # Glob must be product-monthly or product-site-monthly. Product-site-month datums reduce unneccesary republication. + # path structure is e.g. DP1.00098.001/2023/04/CPER/04 (product/year/month/site/day) + glob: /*/*/*/* +parallelism_spec: + constant: 5 +autoscaling: true +resource_requests: + memory: 400M + cpu: 1.2 +resource_limits: + memory: 800M + cpu: 1.2 +sidecar_resource_requests: + memory: 2G + cpu: 0.4 +datum_set_spec: + number: 5 +scheduling_spec: + node_selector: + cloud.google.com/compute-class: pach-pipeline-class \ No newline at end of file diff --git a/pipe/precipWeighingv2/precipWeighingv2_pub_group.yaml b/pipe/precipWeighingv2/precipWeighingv2_pub_group.yaml new file mode 100644 index 000000000..bfaf51178 --- /dev/null +++ b/pipe/precipWeighingv2/precipWeighingv2_pub_group.yaml @@ -0,0 +1,53 @@ +--- +pipeline: + name: precipWeighingv2_pub_group +transform: +# image_pull_secrets: [battelleecology-quay-read-all-pull-secret] + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pub-grp-pack:v4.2.0 + cmd: ["/bin/bash"] + stdin: + - "#!/bin/bash" + - '# Run first module - pub_grouper (using environment variables below as input parameters)' + - python3 -m pub_grouper.pub_grouper_main + env: + # Environment variables for 1st module: pub_grouper. + LOG_LEVEL: INFO + OUT_PATH: "/pfs/out" + ERR_PATH_GROUPER: "pfs/out/errored_datums" + YEAR_INDEX: '3' + GROUP_INDEX: '6' + DATA_TYPE_INDEX: '7' # Also shared with pub_transform + GROUP_METADATA_DIR: group + PUBLOC_KEY: site + LINK_TYPE: SYMLINK + +input: + join: + - pfs: + name: DATA_PATH + repo: precipWeighingv2_level1_group_consolidate_srf + # Glob should be monthly and joined with pub_control to hold pub until month is likely complete + glob: /(*/*) + joinOn: $1 + - pfs: + repo: precipWeighingv2_cron_monthly_and_pub_control + glob: /(*/*) + joinOn: $1 + empty_files: true +parallelism_spec: + constant: 2 +autoscaling: true +resource_requests: + memory: 1.8G + cpu: 1 +resource_limits: + memory: 2.5G + cpu: 1.5 +sidecar_resource_requests: + memory: 4G + cpu: 1 +datum_set_spec: + number: 1 +scheduling_spec: + node_selector: + cloud.google.com/compute-class: pach-pipeline-class \ No newline at end of file diff --git a/pipe/precipWeighingv2/precipWeighingv2_qm_stats_group_and_compute.yaml b/pipe/precipWeighingv2/precipWeighingv2_qm_stats_group_and_compute.yaml index b6b078c84..146b4c901 100644 --- a/pipe/precipWeighingv2/precipWeighingv2_qm_stats_group_and_compute.yaml +++ b/pipe/precipWeighingv2/precipWeighingv2_qm_stats_group_and_compute.yaml @@ -2,7 +2,7 @@ pipeline: name: precipWeighingv2_qm_stats_group_and_compute transform: - image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pluvio-stats:v0.0.3 + image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-pluvio-stats:sha-c5d15db cmd: - sh - "-c" diff --git a/pipe/precipWeighingv2/precipWeighingv2_stats_group_and_compute.yaml b/pipe/precipWeighingv2/precipWeighingv2_stats_group_and_compute.yaml deleted file mode 100644 index 858092c94..000000000 --- a/pipe/precipWeighingv2/precipWeighingv2_stats_group_and_compute.yaml +++ /dev/null @@ -1,101 +0,0 @@ ---- -pipeline: - name: precipWeighingv2_stats_group_and_compute -transform: - image: us-central1-docker.pkg.dev/neon-shared-service/neonscience/neon-is-stat-basc-grp:v2.0.4 - cmd: - - sh - - "-c" - - |- - /bin/bash <<'EOF' - # Use bash-scrict mode. See http://redsymbol.net/articles/unofficial-bash-strict-mode/ - set -euo pipefail - IFS=$'\n\t' - # Refresh interim directories with each datum (otherwise they persist and cause probs) - rm -r -f /tmp/pfs/filter_joined - mkdir -p /tmp/pfs/filter_joined - # Run first module - filter-joiner (using environment variables below as input parameters) - python3 -m filter_joiner.filter_joiner_main - # Run second module - basic stats - Rscript ./flow.stat.basc.R \ - DirIn=/tmp/pfs/filter_joined \ - DirOut=/pfs/out \ - DirErr=/pfs/out/errored_datums \ - FileSchmStat=$FILE_SCHEMA_STATS \ - "WndwAgr=030" \ - "TermStat1=accu_nrt:sum|numPts|expUncert(wrap.ucrt.dp01.cal.cnst)" - - Rscript ./flow.stat.basc.R \ - DirIn=/tmp/pfs/filter_joined \ - DirOut=/pfs/out \ - DirErr=/pfs/out/errored_datums \ - FileSchmStat=$FILE_SCHEMA_STATS_INST \ - "WndwAgr=001" \ - "TermStat1=accu_nrt:sum|expUncert(wrap.ucrt.dp01.cal.cnst)" - EOF - env: - # Environment variables for filter-joiner - CONFIG: | - --- - # In Pachyderm root will be index 0, 'pfs' index 1, and the repo name index 2. - # Metadata indices will typically begin at index 3. - # Use unix-style glob pattern to select the desired directories in each repo - input_paths: - - path: - name: QAQC_PATH - # Filter for data & uncertainty_data directories - glob_pattern: /pfs/QAQC_PATH/*/*/*/*/*/*/data/** - # Join on named location (already joined below by day) - join_indices: [6] - - path: - name: UNCERTAINTY_PATH - # Filter for data directory - glob_pattern: /pfs/UNCERTAINTY_PATH/*/*/*/*/*/*/uncertainty*/** - # Join on named location (already joined below by day) - join_indices: [6] - OUT_PATH: /tmp/pfs/filter_joined - LOG_LEVEL: INFO - RELATIVE_PATH_INDEX: "3" - LINK_TYPE: COPY # options are COPY or SYMLINK. Use COPY for combined module. - # Environment variables for calibration module - PARALLELIZATION_INTERNAL: '5' # Option for stats module -input: - cross: - - pfs: - name: FILE_SCHEMA_STATS - repo: precipWeighingv2_avro_schemas - glob: /precipWeighingv2/precipWeighingv2_dp01_stats_pluvio.avsc - - pfs: - name: FILE_SCHEMA_STATS_INST - repo: precipWeighingv2_avro_schemas - glob: /precipWeighingv2/precipWeighingv2_dp01_stats_pluvio_inst.avsc - - join: - - pfs: - name: QAQC_PATH - repo: precipWeighingv2_analyze_pad_and_qaqc - glob: /(*/*/*) - joinOn: $1 - empty_files: false # Make sure this is false for LINK_TYPE=COPY - - pfs: - name: UNCERTAINTY_PATH - repo: precipWeighingv2_group_path - glob: /(*/*/*) - joinOn: $1 - empty_files: false # Make sure this is false for LINK_TYPE=COPY -parallelism_spec: - constant: 5 -resource_requests: - memory: 1.8G - cpu: 6 -resource_limits: - memory: 3G - cpu: 7 -sidecar_resource_requests: - memory: 3G - cpu: 0.5 -autoscaling: true -datum_set_spec: - number: 1 -scheduling_spec: - node_selector: - cloud.google.com/compute-class: pach-pipeline-class