1+ import multiprocessing
2+ from fastflow import FFPipeline , ff_send_out
3+
4+ # Stage 1: Generate numbers from 1 to 10
5+ def number_generator (pipe_out ):
6+ counter = 1
7+ while counter <= 10 :
8+ pipe_out .send (counter )
9+ counter += 1
10+ pipe_out .close ()
11+
12+ # Stage 2: Square each number
13+ def square_node (pipe_in , pipe_out ):
14+ while True :
15+ try :
16+ data = pipe_in .recv ()
17+ pipe_out .send (data * data )
18+ except EOFError :
19+ pipe_out .close ()
20+ break
21+
22+ # Stage 3: Square each number
23+ def subtract_five_node (pipe_in , pipe_out ):
24+ while True :
25+ try :
26+ data = pipe_in .recv ()
27+ pipe_out .send (data - 5 )
28+ except EOFError :
29+ pipe_out .close ()
30+ break
31+
32+ # Create the pipes
33+ pipe1_out , pipe1_in = multiprocessing .Pipe (duplex = False )
34+ pipe2_out , pipe2_in = multiprocessing .Pipe (duplex = False )
35+
36+ # Create the processes
37+ processes = [
38+ multiprocessing .Process (target = number_generator , args = (pipe1_in )),
39+ multiprocessing .Process (target = square_node , args = (pipe1_out , pipe2_in )),
40+ multiprocessing .Process (target = subtract_five_node , args = (pipe2_out , None ))
41+ ]
42+
43+ # Start all processes in the pipeline
44+ for process in processes :
45+ process .start ()
46+
47+ # Wait for all processes to finish
48+ for process in processes :
49+ process .join ()
50+
51+
52+ class NumberGenerator ():
53+ def svc (self ):
54+ for i in range (1 , 11 ):
55+ ff_send_out (i )
56+ return EOS
57+
58+ class SquareNode ():
59+ def svc (self , number ):
60+ return number * number
61+
62+ class SubtractFiveNode ():
63+ def svc (self , number ):
64+ return number - 5
65+
66+ # Build the pipeline and add the stages
67+ pipe = FFPipeline ()
68+ pipe .add_stage (NumberGenerator ())
69+ pipe .add_stage (SquareNode ())
70+ pipe .add_stage (SubtractFiveNode ())
71+
72+ # Run the pipeline
73+ if pipe .run_and_wait_end () < 0 :
74+ # handle error...
0 commit comments