88use Flowpack \JobQueue \Common \Queue \Message ;
99use Neos \Cache \Frontend \FrontendInterface ;
1010use Neos \Flow \Cli \ConsoleOutput ;
11+ use React \ChildProcess \Process ;
1112use React \EventLoop ;
12- use Symfony \Component \Process \InputStream ;
13- use Symfony \Component \Process \Process ;
1413use t3n \JobQueue \RabbitMQ \Queue \RabbitQueue ;
1514
1615use function array_shift ;
@@ -23,8 +22,10 @@ final class Worker
2322{
2423 protected readonly ConsoleOutput $ output ;
2524
25+ private readonly EventLoop \LoopInterface $ loop ;
26+
2627 /**
27- * @var array{input: InputStream, process: Process} []
28+ * @var Process[]
2829 */
2930 private array $ pool = [];
3031
@@ -40,6 +41,7 @@ public function __construct(
4041 protected readonly FrontendInterface $ messageCache ,
4142 protected readonly Lock $ lock
4243 ) {
44+ $ this ->loop = EventLoop \Loop::get ();
4345 }
4446
4547 public function prepare (): void
@@ -102,47 +104,32 @@ public function executeMessage(Message $message): void
102104 }
103105 }
104106
105- /**
106- * @return array{input: InputStream, process: Process}
107- */
108- private function createProcess (): array
107+ private function createProcess (): Process
109108 {
110- $ input = new InputStream ();
111- $ process = Process::fromShellCommandline (
112- command: $ this ->command ,
113- input: $ input ,
114- timeout: 0
115- );
116- $ process ->start ();
117- return ['input ' => $ input , 'process ' => $ process ];
109+ $ process = new Process ($ this ->command );
110+ $ timer = $ this ->loop ->addPeriodicTimer (0.01 , function () {
111+ // TODO: Add keepalive for database if necessary
112+ });
113+ $ process ->on ('exit ' , function () use ($ timer ) {
114+ $ this ->loop ->cancelTimer ($ timer );
115+ $ this ->loop ->stop ();
116+ });
117+ $ process ->start (loop: $ this ->loop , interval: 0.01 );
118+ return $ process ;
118119 }
119120
120121 private function runFromPool (string $ messageCacheIdentifier ): Process
121122 {
122123 $ this ->cleanPool ();
123- ['input ' => $ input , 'process ' => $ process ] = array_shift ($ this ->pool );
124- $ this ->pool [] = $ this ->createProcess ();
125-
126- assert ($ input instanceof InputStream);
124+ $ process = array_shift ($ this ->pool );
127125 assert ($ process instanceof Process);
128126
129- $ input ->write ($ messageCacheIdentifier . PHP_EOL );
130-
131- $ loop = EventLoop \Loop::get ();
132- $ loop ->addPeriodicTimer (0.01 , function (EventLoop \TimerInterface $ timer ) use ($ process , $ loop ) {
133- try {
134- fputs (STDOUT , $ process ->getIncrementalOutput ());
135- fputs (STDERR , $ process ->getIncrementalErrorOutput ());
136- } catch (\Throwable $ e ) {
137- }
127+ $ process ->stdout ->on ('data ' , fn ($ chunk ) => fputs (STDOUT , $ chunk ));
128+ $ process ->stderr ->on ('data ' , fn ($ chunk ) => fputs (STDERR , $ chunk ));
138129
139- if (!$ process ->isRunning ()) {
140- $ loop ->cancelTimer ($ timer );
141- $ loop ->stop ();
142- }
143- });
130+ $ process ->stdin ->write ($ messageCacheIdentifier . PHP_EOL );
144131
145- $ loop ->run ();
132+ $ this -> loop ->run ();
146133
147134 return $ process ;
148135 }
@@ -151,7 +138,7 @@ private function cleanPool(): void
151138 {
152139 $ this ->pool = array_filter (
153140 $ this ->pool ,
154- fn (array $ item ) => $ item [ ' process ' ] ->isRunning ()
141+ fn (Process $ process ) => $ process ->isRunning ()
155142 );
156143 while (count ($ this ->pool ) < $ this ->poolSize ) {
157144 $ this ->pool [] = $ this ->createProcess ();
0 commit comments