1010use PgAsync \Command \Execute ;
1111use PgAsync \Command \Parse ;
1212use PgAsync \Command \PasswordMessage ;
13+ use PgAsync \Command \SSLRequest ;
1314use PgAsync \Command \Sync ;
1415use PgAsync \Command \Terminate ;
1516use PgAsync \Message \Authentication ;
3031use PgAsync \Message \ReadyForQuery ;
3132use PgAsync \Message \RowDescription ;
3233use PgAsync \Command \StartupMessage ;
34+ use React \EventLoop \Loop ;
3335use React \EventLoop \LoopInterface ;
34- use React \Socket \Connector ;
36+ use React \Promise \Promise ;
37+ use React \Socket \ConnectionInterface ;
38+ use WyriHaximus \React \Socket \Connector ;
3539use React \Socket \ConnectorInterface ;
40+ use WyriHaximus \React \Socket \OpportunisticTlsConnectionInterface ;
3641use React \Stream \DuplexStreamInterface ;
3742use Rx \Disposable \CallbackDisposable ;
3843use Rx \Disposable \EmptyDisposable ;
4146use Rx \ObserverInterface ;
4247use Rx \SchedulerInterface ;
4348use Rx \Subject \Subject ;
49+ use function React \Promise \resolve ;
50+ use function React \Promise \Stream \first ;
4451
4552class Connection extends EventEmitter
4653{
@@ -71,6 +78,16 @@ class Connection extends EventEmitter
7178 const CONNECTION_NEEDED = 8 ; /* Internal state: connect() needed */
7279 const CONNECTION_CLOSED = 9 ;
7380
81+ // Reference table: https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION
82+ const TLS_MODE_DISABLE = 'disable ' ;
83+ const TLS_MODE_ALLOW = 'allow ' ;
84+ const TLS_MODE_PREFER = 'prefer ' ;
85+ const TLS_MODE_REQUIRE = 'require ' ;
86+ const TLS_MODE_VERIFY_CA = 'verify-ca ' ;
87+ const TLS_MODE_VERIFY_FULL = 'verify-full ' ;
88+ const TLS_MODE_LIST_FULL = [self ::TLS_MODE_DISABLE , self ::TLS_MODE_ALLOW , self ::TLS_MODE_PREFER , self ::TLS_MODE_REQUIRE , self ::TLS_MODE_VERIFY_CA , self ::TLS_MODE_VERIFY_FULL ];
89+ const TLS_MODE_LIST_REQUIRED = [self ::TLS_MODE_REQUIRE , self ::TLS_MODE_VERIFY_CA , self ::TLS_MODE_VERIFY_FULL ];
90+
7491 private $ queryState ;
7592 private $ queryType ;
7693 private $ connStatus ;
@@ -129,6 +146,8 @@ class Connection extends EventEmitter
129146
130147 /** @var bool */
131148 private $ auto_disconnect = false ;
149+ private $ tls = self ::TLS_MODE_PREFER ;
150+ private $ tlsConnectorFlags = [];
132151 private $ password ;
133152
134153 public function __construct (array $ parameters , LoopInterface $ loop , ConnectorInterface $ connector = null )
@@ -153,6 +172,19 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt
153172 unset($ parameters ['password ' ]);
154173 }
155174
175+ if (array_key_exists ('tls ' , $ parameters )) {
176+ if (!in_array ($ this ->tls , self ::TLS_MODE_LIST_FULL )) {
177+ throw new \InvalidArgumentException ('TLS mode must be one off " ' . implode (', ' , self ::TLS_MODE_LIST_FULL ) . ' but got " ' . $ parameters ['tls ' ] . '" instead ' );
178+ }
179+ $ this ->tls = $ parameters ['tls ' ];
180+ unset($ parameters ['tls ' ]);
181+ }
182+
183+ if (array_key_exists ('tls_connector_flags ' , $ parameters )) {
184+ $ this ->tlsConnectorFlags = $ parameters ['tls_connector_flags ' ];
185+ unset($ parameters ['tls_connector_flags ' ]);
186+ }
187+
156188 if (isset ($ parameters ['auto_disconnect ' ])) {
157189 $ this ->auto_disconnect = $ parameters ['auto_disconnect ' ];
158190 unset($ parameters ['auto_disconnect ' ]);
@@ -167,8 +199,17 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt
167199 $ this ->queryState = static ::STATE_BUSY ;
168200 $ this ->queryType = static ::QUERY_SIMPLE ;
169201 $ this ->connStatus = static ::CONNECTION_NEEDED ;
170- $ this ->socket = $ connector ?: new Connector ($ loop );
171- $ this ->uri = 'tcp:// ' . $ parameters ['host ' ] . ': ' . $ parameters ['port ' ];
202+ $ this ->socket = $ connector ?: new Connector ($ loop , [
203+ 'tls ' => [
204+ 'verify_peer ' => $ this ->tls === self ::TLS_MODE_VERIFY_FULL ,
205+ 'verify_peer_name ' => $ this ->tls === self ::TLS_MODE_VERIFY_FULL ,
206+ 'allow_self_signed ' => $ this ->tls !== self ::TLS_MODE_VERIFY_FULL ,
207+ ] + $ this ->tlsConnectorFlags ,
208+ ]);
209+ // We always url `opportunistic+tls` as scheme because the logic required for using `tcp` on TLS `disable`
210+ // mode is more complex than worth it when connecting to the server. And the `SecureConnector` gives us a
211+ // plaint text connection with all TLS flags already set and ready to use for all the other modes.
212+ $ this ->uri = 'opportunistic+tls:// ' . $ parameters ['host ' ] . ': ' . $ parameters ['port ' ];
172213 $ this ->notificationSubject = new Subject ();
173214 $ this ->cancelPending = false ;
174215 $ this ->cancelRequested = false ;
@@ -185,23 +226,43 @@ private function start()
185226 $ this ->connStatus = static ::CONNECTION_STARTED ;
186227
187228 $ this ->socket ->connect ($ this ->uri )->then (
188- function (DuplexStreamInterface $ stream ) {
189- $ this ->stream = $ stream ;
190- $ this ->connStatus = static ::CONNECTION_MADE ;
191-
192- $ stream ->on ('close ' , [$ this , 'onClose ' ]);
229+ function (OpportunisticTlsConnectionInterface $ stream ) {
230+ (new Promise (function (callable $ resolve , callable $ reject ) use ($ stream ) {
231+ if ($ this ->tls !== self ::TLS_MODE_DISABLE ) {
232+ first ($ stream )->then (function ($ data ) use ($ resolve , $ reject , $ stream ) {
233+ if (trim ($ data ) === 'S ' ) {
234+ $ stream ->enableEncryption ()->then ($ resolve , $ reject );
235+ return ;
236+ }
237+
238+ if (in_array ($ this ->tls , self ::TLS_MODE_LIST_REQUIRED )) {
239+ $ reject (new \RuntimeException ('Failed to encrypt connection while required ' ));
240+ return ;
241+ }
242+
243+ $ resolve ($ stream );
244+ }, $ reject );
245+
246+ $ ssl = new SSLRequest ();
247+ $ stream ->write ($ ssl ->encodedMessage ());
248+ return ;
249+ }
193250
194- $ stream ->on ('data ' , [$ this , 'onData ' ]);
251+ $ resolve ($ stream );
252+ }))->then (function (DuplexStreamInterface $ stream ) {
253+ $ this ->stream = $ stream ;
254+ $ this ->connStatus = static ::CONNECTION_MADE ;
195255
196- // $ssl = new SSLRequest( );
197- // $stream->write($ssl->encodedMessage() );
256+ $ stream -> on ( ' close ' , [ $ this , ' onClose ' ] );
257+ $ stream ->on ( ' data ' , [ $ this , ' onData ' ] );
198258
199- $ startupParameters = $ this ->parameters ;
200- unset($ startupParameters ['host ' ], $ startupParameters ['port ' ]);
259+ $ startupParameters = $ this ->parameters ;
260+ unset($ startupParameters ['host ' ], $ startupParameters ['port ' ]);
201261
202- $ startup = new StartupMessage ();
203- $ startup ->setParameters ($ startupParameters );
204- $ stream ->write ($ startup ->encodedMessage ());
262+ $ startup = new StartupMessage ();
263+ $ startup ->setParameters ($ startupParameters );
264+ $ stream ->write ($ startup ->encodedMessage ());
265+ })->done ();
205266 },
206267 function ($ e ) {
207268 // connection error
@@ -566,11 +627,11 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use
566627 $ this ->processQueue ();
567628
568629 return new CallbackDisposable (function () use ($ q ) {
569- if ($ this ->currentCommand === $ q && $ q ->isActive ()) {
570- $ this ->cancelRequested = true ;
571- }
572- $ q ->cancel ();
573- });
630+ if ($ this ->currentCommand === $ q && $ q ->isActive ()) {
631+ $ this ->cancelRequested = true ;
632+ }
633+ $ q ->cancel ();
634+ });
574635 }
575636 );
576637
0 commit comments