1010//! ```sql
1111//! CREATE TABLE ow_tasks (
1212//! id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
13- //! script TEXT,
14- //! code TEXT,
13+ //! script TEXT NOT NULL,
1514//! payload JSONB,
1615//! status TEXT NOT NULL DEFAULT 'pending',
1716//! result JSONB,
1817//! error TEXT,
1918//! created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
2019//! started_at TIMESTAMPTZ,
21- //! completed_at TIMESTAMPTZ,
22- //! CONSTRAINT valid_source CHECK (script IS NOT NULL OR code IS NOT NULL)
20+ //! completed_at TIMESTAMPTZ
2321//! );
2422//!
2523//! CREATE INDEX idx_ow_tasks_pending ON ow_tasks(created_at) WHERE status = 'pending';
2624//!
2725//! CREATE OR REPLACE FUNCTION notify_ow_task_created() RETURNS TRIGGER AS $$
2826//! BEGIN
29- //! PERFORM pg_notify('ow_task_created ', NEW.id::text);
27+ //! PERFORM pg_notify('ow_tasks_created ', NEW.id::text);
3028//! RETURN NEW;
3129//! END;
3230//! $$ LANGUAGE plpgsql;
@@ -48,8 +46,7 @@ use uuid::Uuid;
4846#[ derive( Debug ) ]
4947pub struct DbTask {
5048 pub id : Uuid ,
51- pub script : Option < String > ,
52- pub code : Option < String > ,
49+ pub script : String ,
5350 pub payload : Option < JsonValue > ,
5451 pub created_at : DateTime < Utc > ,
5552}
@@ -115,7 +112,7 @@ impl DbPool {
115112 FOR UPDATE SKIP LOCKED
116113 LIMIT 1
117114 )
118- RETURNING id, script, code, payload, created_at
115+ RETURNING id, script, payload, created_at
119116 "# ,
120117 table = self . table_name
121118 ) ;
@@ -125,7 +122,6 @@ impl DbPool {
125122 Ok ( row. map ( |r| DbTask {
126123 id : r. get ( "id" ) ,
127124 script : r. get ( "script" ) ,
128- code : r. get ( "code" ) ,
129125 payload : r. get ( "payload" ) ,
130126 created_at : r. get ( "created_at" ) ,
131127 } ) )
@@ -396,26 +392,16 @@ async fn process_task<F>(
396392 }
397393}
398394
399- /// Get script content from file or inline code
400- fn get_script_content ( root : & PathBuf , task : & DbTask ) -> Result < String , String > {
401- match ( & task. script , & task. code ) {
402- ( Some ( script_path) , None ) => {
403- let resolved = resolve_script_path ( root, script_path) ?;
395+ /// Get script content from file
396+ pub ( crate ) fn get_script_content ( root : & PathBuf , task : & DbTask ) -> Result < String , String > {
397+ let resolved = resolve_script_path ( root, & task. script ) ?;
404398
405- std:: fs:: read_to_string ( & resolved)
406- . map_err ( |e| format ! ( "Failed to read script '{}': {}" , script_path, e) )
407- }
408-
409- ( None , Some ( code) ) => Ok ( code. clone ( ) ) ,
410-
411- ( Some ( _) , Some ( _) ) => Err ( "Task has both 'script' and 'code' defined" . to_string ( ) ) ,
412-
413- ( None , None ) => Err ( "Task has neither 'script' nor 'code' defined" . to_string ( ) ) ,
414- }
399+ std:: fs:: read_to_string ( & resolved)
400+ . map_err ( |e| format ! ( "Failed to read script '{}': {}" , task. script, e) )
415401}
416402
417403/// Resolve script path safely within the root directory
418- fn resolve_script_path ( root : & PathBuf , script_path : & str ) -> Result < PathBuf , String > {
404+ pub ( crate ) fn resolve_script_path ( root : & PathBuf , script_path : & str ) -> Result < PathBuf , String > {
419405 // Reject absolute paths
420406 if script_path. starts_with ( '/' ) || script_path. starts_with ( '\\' ) {
421407 return Err ( format ! ( "Absolute paths are not allowed: '{}'" , script_path) ) ;
@@ -444,3 +430,146 @@ fn resolve_script_path(root: &PathBuf, script_path: &str) -> Result<PathBuf, Str
444430
445431 Ok ( canonical)
446432}
433+
434+ #[ cfg( test) ]
435+ mod tests {
436+ use super :: * ;
437+ use std:: fs;
438+ use tempfile:: TempDir ;
439+
440+ fn make_task ( script : & str ) -> DbTask {
441+ DbTask {
442+ id : Uuid :: new_v4 ( ) ,
443+ script : script. to_string ( ) ,
444+ payload : None ,
445+ created_at : Utc :: now ( ) ,
446+ }
447+ }
448+
449+ // ─────────────────────────────────────────────────────────────────────────
450+ // get_script_content tests
451+ // ─────────────────────────────────────────────────────────────────────────
452+
453+ #[ test]
454+ fn test_get_script_content_from_file ( ) {
455+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
456+ // Canonicalize to handle macOS /tmp -> /private/tmp symlink
457+ let root = temp_dir. path ( ) . canonicalize ( ) . unwrap ( ) ;
458+ let script_path = root. join ( "test.js" ) ;
459+ fs:: write ( & script_path, "export default {}" ) . unwrap ( ) ;
460+
461+ let task = make_task ( "test.js" ) ;
462+ let content = get_script_content ( & root, & task) . unwrap ( ) ;
463+
464+ assert_eq ! ( content, "export default {}" ) ;
465+ }
466+
467+ #[ test]
468+ fn test_get_script_content_nested_file ( ) {
469+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
470+ let root = temp_dir. path ( ) . canonicalize ( ) . unwrap ( ) ;
471+ let nested_dir = root. join ( "workers" ) ;
472+ fs:: create_dir ( & nested_dir) . unwrap ( ) ;
473+ let script_path = nested_dir. join ( "task.js" ) ;
474+ fs:: write ( & script_path, "console.log('nested')" ) . unwrap ( ) ;
475+
476+ let task = make_task ( "workers/task.js" ) ;
477+ let content = get_script_content ( & root, & task) . unwrap ( ) ;
478+
479+ assert_eq ! ( content, "console.log('nested')" ) ;
480+ }
481+
482+ #[ test]
483+ fn test_get_script_content_file_not_found ( ) {
484+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
485+ let root = temp_dir. path ( ) . canonicalize ( ) . unwrap ( ) ;
486+ let task = make_task ( "nonexistent.js" ) ;
487+
488+ let result = get_script_content ( & root, & task) ;
489+ assert ! ( result. is_err( ) ) ;
490+ }
491+
492+ // ─────────────────────────────────────────────────────────────────────────
493+ // resolve_script_path tests
494+ // ─────────────────────────────────────────────────────────────────────────
495+
496+ #[ test]
497+ fn test_resolve_script_path_valid ( ) {
498+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
499+ // Canonicalize to handle macOS /tmp -> /private/tmp symlink
500+ let root = temp_dir. path ( ) . canonicalize ( ) . unwrap ( ) ;
501+ let script_path = root. join ( "worker.js" ) ;
502+ fs:: write ( & script_path, "" ) . unwrap ( ) ;
503+
504+ let result = resolve_script_path ( & root, "worker.js" ) ;
505+ assert ! ( result. is_ok( ) ) ;
506+ assert_eq ! ( result. unwrap( ) , script_path) ;
507+ }
508+
509+ #[ test]
510+ fn test_resolve_script_path_nested ( ) {
511+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
512+ let root = temp_dir. path ( ) . canonicalize ( ) . unwrap ( ) ;
513+ let nested_dir = root. join ( "workers" ) ;
514+ fs:: create_dir ( & nested_dir) . unwrap ( ) ;
515+ let script_path = nested_dir. join ( "task.js" ) ;
516+ fs:: write ( & script_path, "" ) . unwrap ( ) ;
517+
518+ let result = resolve_script_path ( & root, "workers/task.js" ) ;
519+ assert ! ( result. is_ok( ) ) ;
520+ }
521+
522+ #[ test]
523+ fn test_resolve_script_path_absolute_rejected ( ) {
524+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
525+ let root = temp_dir. path ( ) . canonicalize ( ) . unwrap ( ) ;
526+
527+ let result = resolve_script_path ( & root, "/etc/passwd" ) ;
528+ assert ! ( result. is_err( ) ) ;
529+ assert ! ( result. unwrap_err( ) . contains( "Absolute paths" ) ) ;
530+ }
531+
532+ #[ test]
533+ fn test_resolve_script_path_traversal_rejected ( ) {
534+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
535+ let root = temp_dir. path ( ) . canonicalize ( ) . unwrap ( ) ;
536+
537+ let result = resolve_script_path ( & root, "../etc/passwd" ) ;
538+ assert ! ( result. is_err( ) ) ;
539+ assert ! ( result. unwrap_err( ) . contains( "traversal" ) ) ;
540+ }
541+
542+ #[ test]
543+ fn test_resolve_script_path_not_found ( ) {
544+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
545+ let root = temp_dir. path ( ) . canonicalize ( ) . unwrap ( ) ;
546+
547+ let result = resolve_script_path ( & root, "missing.js" ) ;
548+ assert ! ( result. is_err( ) ) ;
549+ assert ! ( result. unwrap_err( ) . contains( "not found" ) ) ;
550+ }
551+
552+ // ─────────────────────────────────────────────────────────────────────────
553+ // DbPool channel name tests
554+ // ─────────────────────────────────────────────────────────────────────────
555+
556+ #[ test]
557+ fn test_channel_name_from_table ( ) {
558+ // Test the channel name generation logic
559+ let table_name = "ow_tasks" ;
560+ let expected = "ow_tasks_created" ;
561+ let channel = format ! ( "{}_created" , table_name. replace( '.' , "_" ) ) ;
562+
563+ assert_eq ! ( channel, expected) ;
564+ }
565+
566+ #[ test]
567+ fn test_channel_name_with_schema ( ) {
568+ // Table with schema prefix
569+ let table_name = "public.my_tasks" ;
570+ let expected = "public_my_tasks_created" ;
571+ let channel = format ! ( "{}_created" , table_name. replace( '.' , "_" ) ) ;
572+
573+ assert_eq ! ( channel, expected) ;
574+ }
575+ }
0 commit comments