@@ -460,6 +460,160 @@ def create_options(self):
460460 return options
461461
462462
463+ class PortableRunnerOptimizationTest (unittest .TestCase ):
464+ """Tests for PortableRunner._optimize_pipeline."""
465+
466+ def test_default_optimize_expands_sdf (self ):
467+ """Verify that expand_sdf is applied in the default pre_optimize setting.
468+
469+ See https://github.com/apache/beam/issues/24422.
470+ """
471+ from apache_beam .portability import common_urns
472+ from apache_beam .io import restriction_trackers
473+
474+ class ExpandStringsProvider (beam .transforms .core .RestrictionProvider ):
475+ def initial_restriction (self , element ):
476+ return restriction_trackers .OffsetRange (0 , len (element ))
477+
478+ def create_tracker (self , restriction ):
479+ return restriction_trackers .OffsetRestrictionTracker (restriction )
480+
481+ def restriction_size (self , element , restriction ):
482+ return restriction .size ()
483+
484+ class ExpandingStringsDoFn (beam .DoFn ):
485+ def process (
486+ self ,
487+ element ,
488+ restriction_tracker = beam .DoFn .RestrictionParam (
489+ ExpandStringsProvider ())):
490+ cur = restriction_tracker .current_restriction ().start
491+ while restriction_tracker .try_claim (cur ):
492+ yield element [cur ]
493+ cur += 1
494+
495+ p = beam .Pipeline ()
496+ _ = (p | beam .Create (['abc' ]) | beam .ParDo (ExpandingStringsDoFn ()))
497+ proto = p .to_runner_api ()
498+
499+ # Default options (no pre_optimize experiment set).
500+ options = PipelineOptions ()
501+ optimized = PortableRunner ._optimize_pipeline (proto , options )
502+
503+ transform_urns = set ()
504+ for t in optimized .components .transforms .values ():
505+ if t .spec .urn :
506+ transform_urns .add (t .spec .urn )
507+
508+ self .assertIn (
509+ common_urns .sdf_components .PAIR_WITH_RESTRICTION .urn , transform_urns )
510+ self .assertIn (
511+ common_urns .sdf_components .SPLIT_AND_SIZE_RESTRICTIONS .urn ,
512+ transform_urns )
513+ self .assertIn (
514+ common_urns .sdf_components
515+ .PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS .urn ,
516+ transform_urns )
517+
518+ def test_custom_optimize_expand_sdf (self ):
519+ """Verify that expand_sdf can be requested explicitly."""
520+ from apache_beam .portability import common_urns
521+ from apache_beam .io import restriction_trackers
522+
523+ class ExpandStringsProvider (beam .transforms .core .RestrictionProvider ):
524+ def initial_restriction (self , element ):
525+ return restriction_trackers .OffsetRange (0 , len (element ))
526+
527+ def create_tracker (self , restriction ):
528+ return restriction_trackers .OffsetRestrictionTracker (restriction )
529+
530+ def restriction_size (self , element , restriction ):
531+ return restriction .size ()
532+
533+ class ExpandingStringsDoFn (beam .DoFn ):
534+ def process (
535+ self ,
536+ element ,
537+ restriction_tracker = beam .DoFn .RestrictionParam (
538+ ExpandStringsProvider ())):
539+ cur = restriction_tracker .current_restriction ().start
540+ while restriction_tracker .try_claim (cur ):
541+ yield element [cur ]
542+ cur += 1
543+
544+ p = beam .Pipeline ()
545+ _ = (p | beam .Create (['abc' ]) | beam .ParDo (ExpandingStringsDoFn ()))
546+ proto = p .to_runner_api ()
547+
548+ options = PipelineOptions (['--experiments=pre_optimize=expand_sdf' ])
549+ optimized = PortableRunner ._optimize_pipeline (proto , options )
550+
551+ transform_urns = set ()
552+ for t in optimized .components .transforms .values ():
553+ if t .spec .urn :
554+ transform_urns .add (t .spec .urn )
555+
556+ self .assertIn (
557+ common_urns .sdf_components .PAIR_WITH_RESTRICTION .urn , transform_urns )
558+ self .assertIn (
559+ common_urns .sdf_components .SPLIT_AND_SIZE_RESTRICTIONS .urn ,
560+ transform_urns )
561+ self .assertIn (
562+ common_urns .sdf_components
563+ .PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS .urn ,
564+ transform_urns )
565+
566+ def test_default_optimize_expands_bounded_read (self ):
567+ """Verify that iobase.Read(BoundedSource) is expanded by default.
568+
569+ This is the end-to-end scenario from
570+ https://github.com/apache/beam/issues/24422: Read transforms like
571+ ReadFromParquet use SDFs internally. Without expand_sdf in the default
572+ optimization, these arrive at the Spark job server as a single ParDo,
573+ executing on one partition with no parallelization.
574+ """
575+ from apache_beam .portability import common_urns
576+ from apache_beam .io import iobase
577+
578+ class _FakeBoundedSource (iobase .BoundedSource ):
579+ def get_range_tracker (self , start_position , stop_position ):
580+ return None
581+
582+ def read (self , range_tracker ):
583+ return iter ([])
584+
585+ def estimate_size (self ):
586+ return 0
587+
588+ p = beam .Pipeline ()
589+ _ = p | beam .io .Read (_FakeBoundedSource ())
590+ proto = p .to_runner_api ()
591+
592+ # Default options (no pre_optimize experiment set).
593+ options = PipelineOptions ()
594+ optimized = PortableRunner ._optimize_pipeline (proto , options )
595+
596+ transform_urns = set ()
597+ for t in optimized .components .transforms .values ():
598+ if t .spec .urn :
599+ transform_urns .add (t .spec .urn )
600+
601+ # The SDFBoundedSourceReader DoFn should have been expanded into
602+ # SDF component stages.
603+ self .assertIn (
604+ common_urns .sdf_components .PAIR_WITH_RESTRICTION .urn , transform_urns )
605+ self .assertIn (
606+ common_urns .sdf_components .SPLIT_AND_SIZE_RESTRICTIONS .urn ,
607+ transform_urns )
608+ self .assertIn (
609+ common_urns .sdf_components
610+ .PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS .urn ,
611+ transform_urns )
612+ # Reshuffle should be present to enable parallelization.
613+ self .assertIn (
614+ common_urns .composites .RESHUFFLE .urn , transform_urns )
615+
616+
463617if __name__ == '__main__' :
464618 logging .getLogger ().setLevel (logging .INFO )
465619 unittest .main ()
0 commit comments