1212use PgAsync \Command \PasswordMessage ;
1313use PgAsync \Command \SaslInitialResponse ;
1414use PgAsync \Command \SaslResponse ;
15+ use PgAsync \Command \SSLRequest ;
1516use PgAsync \Command \Sync ;
1617use PgAsync \Command \Terminate ;
1718use PgAsync \Message \Authentication ;
3233use PgAsync \Message \ReadyForQuery ;
3334use PgAsync \Message \RowDescription ;
3435use PgAsync \Command \StartupMessage ;
36+ use React \EventLoop \Loop ;
3537use React \EventLoop \LoopInterface ;
36- use React \Socket \Connector ;
38+ use React \Promise \Promise ;
39+ use React \Socket \ConnectionInterface ;
40+ use WyriHaximus \React \Socket \Connector ;
3741use React \Socket \ConnectorInterface ;
42+ use WyriHaximus \React \Socket \OpportunisticTlsConnectionInterface ;
3843use React \Stream \DuplexStreamInterface ;
3944use Rx \Disposable \CallbackDisposable ;
4045use Rx \Disposable \EmptyDisposable ;
4348use Rx \ObserverInterface ;
4449use Rx \SchedulerInterface ;
4550use Rx \Subject \Subject ;
51+ use function React \Promise \resolve ;
52+ use function React \Promise \Stream \first ;
4653
4754class Connection extends EventEmitter
4855{
@@ -73,6 +80,16 @@ class Connection extends EventEmitter
7380 const CONNECTION_NEEDED = 8 ; /* Internal state: connect() needed */
7481 const CONNECTION_CLOSED = 9 ;
7582
83+ // Reference table: https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION
84+ const TLS_MODE_DISABLE = 'disable ' ;
85+ const TLS_MODE_ALLOW = 'allow ' ;
86+ const TLS_MODE_PREFER = 'prefer ' ;
87+ const TLS_MODE_REQUIRE = 'require ' ;
88+ const TLS_MODE_VERIFY_CA = 'verify-ca ' ;
89+ const TLS_MODE_VERIFY_FULL = 'verify-full ' ;
90+ const TLS_MODE_LIST_FULL = [self ::TLS_MODE_DISABLE , self ::TLS_MODE_ALLOW , self ::TLS_MODE_PREFER , self ::TLS_MODE_REQUIRE , self ::TLS_MODE_VERIFY_CA , self ::TLS_MODE_VERIFY_FULL ];
91+ const TLS_MODE_LIST_REQUIRED = [self ::TLS_MODE_REQUIRE , self ::TLS_MODE_VERIFY_CA , self ::TLS_MODE_VERIFY_FULL ];
92+
7693 private $ queryState ;
7794 private $ queryType ;
7895 private $ connStatus ;
@@ -134,6 +151,8 @@ class Connection extends EventEmitter
134151
135152 /** @var bool */
136153 private $ auto_disconnect = false ;
154+ private $ tls = self ::TLS_MODE_PREFER ;
155+ private $ tlsConnectorFlags = [];
137156 private $ password ;
138157
139158 public function __construct (array $ parameters , LoopInterface $ loop , ConnectorInterface $ connector = null )
@@ -158,6 +177,19 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt
158177 unset($ parameters ['password ' ]);
159178 }
160179
180+ if (array_key_exists ('tls ' , $ parameters )) {
181+ if (!in_array ($ this ->tls , self ::TLS_MODE_LIST_FULL )) {
182+ throw new \InvalidArgumentException ('TLS mode must be one off " ' . implode (', ' , self ::TLS_MODE_LIST_FULL ) . ' but got " ' . $ parameters ['tls ' ] . '" instead ' );
183+ }
184+ $ this ->tls = $ parameters ['tls ' ];
185+ unset($ parameters ['tls ' ]);
186+ }
187+
188+ if (array_key_exists ('tls_connector_flags ' , $ parameters )) {
189+ $ this ->tlsConnectorFlags = $ parameters ['tls_connector_flags ' ];
190+ unset($ parameters ['tls_connector_flags ' ]);
191+ }
192+
161193 if (isset ($ parameters ['auto_disconnect ' ])) {
162194 $ this ->auto_disconnect = $ parameters ['auto_disconnect ' ];
163195 unset($ parameters ['auto_disconnect ' ]);
@@ -172,8 +204,17 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt
172204 $ this ->queryState = static ::STATE_BUSY ;
173205 $ this ->queryType = static ::QUERY_SIMPLE ;
174206 $ this ->connStatus = static ::CONNECTION_NEEDED ;
175- $ this ->socket = $ connector ?: new Connector ($ loop );
176- $ this ->uri = 'tcp:// ' . $ parameters ['host ' ] . ': ' . $ parameters ['port ' ];
207+ $ this ->socket = $ connector ?: new Connector ($ loop , [
208+ 'tls ' => [
209+ 'verify_peer ' => $ this ->tls === self ::TLS_MODE_VERIFY_FULL ,
210+ 'verify_peer_name ' => $ this ->tls === self ::TLS_MODE_VERIFY_FULL ,
211+ 'allow_self_signed ' => $ this ->tls !== self ::TLS_MODE_VERIFY_FULL ,
212+ ] + $ this ->tlsConnectorFlags ,
213+ ]);
214+ // We always url `opportunistic+tls` as scheme because the logic required for using `tcp` on TLS `disable`
215+ // mode is more complex than worth it when connecting to the server. And the `SecureConnector` gives us a
216+ // plaint text connection with all TLS flags already set and ready to use for all the other modes.
217+ $ this ->uri = 'opportunistic+tls:// ' . $ parameters ['host ' ] . ': ' . $ parameters ['port ' ];
177218 $ this ->notificationSubject = new Subject ();
178219 $ this ->cancelPending = false ;
179220 $ this ->cancelRequested = false ;
@@ -191,23 +232,43 @@ private function start()
191232 $ this ->connStatus = static ::CONNECTION_STARTED ;
192233
193234 $ this ->socket ->connect ($ this ->uri )->then (
194- function (DuplexStreamInterface $ stream ) {
195- $ this ->stream = $ stream ;
196- $ this ->connStatus = static ::CONNECTION_MADE ;
197-
198- $ stream ->on ('close ' , [$ this , 'onClose ' ]);
235+ function (OpportunisticTlsConnectionInterface $ stream ) {
236+ (new Promise (function (callable $ resolve , callable $ reject ) use ($ stream ) {
237+ if ($ this ->tls !== self ::TLS_MODE_DISABLE ) {
238+ first ($ stream )->then (function ($ data ) use ($ resolve , $ reject , $ stream ) {
239+ if (trim ($ data ) === 'S ' ) {
240+ $ stream ->enableEncryption ()->then ($ resolve , $ reject );
241+ return ;
242+ }
243+
244+ if (in_array ($ this ->tls , self ::TLS_MODE_LIST_REQUIRED )) {
245+ $ reject (new \RuntimeException ('Failed to encrypt connection while required ' ));
246+ return ;
247+ }
248+
249+ $ resolve ($ stream );
250+ }, $ reject );
251+
252+ $ ssl = new SSLRequest ();
253+ $ stream ->write ($ ssl ->encodedMessage ());
254+ return ;
255+ }
199256
200- $ stream ->on ('data ' , [$ this , 'onData ' ]);
257+ $ resolve ($ stream );
258+ }))->then (function (DuplexStreamInterface $ stream ) {
259+ $ this ->stream = $ stream ;
260+ $ this ->connStatus = static ::CONNECTION_MADE ;
201261
202- // $ssl = new SSLRequest( );
203- // $stream->write($ssl->encodedMessage() );
262+ $ stream -> on ( ' close ' , [ $ this , ' onClose ' ] );
263+ $ stream ->on ( ' data ' , [ $ this , ' onData ' ] );
204264
205- $ startupParameters = $ this ->parameters ;
206- unset($ startupParameters ['host ' ], $ startupParameters ['port ' ]);
265+ $ startupParameters = $ this ->parameters ;
266+ unset($ startupParameters ['host ' ], $ startupParameters ['port ' ]);
207267
208- $ startup = new StartupMessage ();
209- $ startup ->setParameters ($ startupParameters );
210- $ stream ->write ($ startup ->encodedMessage ());
268+ $ startup = new StartupMessage ();
269+ $ startup ->setParameters ($ startupParameters );
270+ $ stream ->write ($ startup ->encodedMessage ());
271+ });
211272 },
212273 function ($ e ) {
213274 // connection error
@@ -596,11 +657,11 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use
596657 $ this ->processQueue ();
597658
598659 return new CallbackDisposable (function () use ($ q ) {
599- if ($ this ->currentCommand === $ q && $ q ->isActive ()) {
600- $ this ->cancelRequested = true ;
601- }
602- $ q ->cancel ();
603- });
660+ if ($ this ->currentCommand === $ q && $ q ->isActive ()) {
661+ $ this ->cancelRequested = true ;
662+ }
663+ $ q ->cancel ();
664+ });
604665 }
605666 );
606667
0 commit comments