11use std:: { collections:: HashMap , future:: Future , marker:: PhantomData , time:: Duration } ;
22
33use temporalio_client:: {
4- ClientInitError , ClientKeepAliveOptions , ClientOptions , ClientTlsOptions , ConfiguredClient ,
5- HttpConnectProxyOptions , RetryClient , RetryOptions , TemporalServiceClient , TlsOptions ,
4+ ClientKeepAliveOptions , ClientTlsOptions , Connection , ConnectionOptions ,
5+ HttpConnectProxyOptions , RetryOptions , TlsOptions , errors :: ClientConnectError ,
66} ;
77
88use magnus:: {
@@ -51,30 +51,28 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
5151 Ok ( ( ) )
5252}
5353
54- type CoreClient = RetryClient < ConfiguredClient < TemporalServiceClient > > ;
55-
5654#[ derive( DataTypeFunctions , TypedData ) ]
5755#[ magnus( class = "Temporalio::Internal::Bridge::Client" , free_immediately) ]
5856pub struct Client {
59- pub ( crate ) core : CoreClient ,
57+ pub ( crate ) core : Connection ,
6058 pub ( crate ) runtime_handle : RuntimeHandle ,
6159}
6260
6361#[ macro_export]
6462macro_rules! rpc_call {
65- ( $client: ident, $callback: ident, $call: ident, $trait: tt, $call_name: ident) => { {
63+ ( $client: ident, $callback: ident, $call: ident, $trait: tt, $service_method : ident , $ call_name: ident) => { {
6664 let cancel_token = $call. cancel_token. clone( ) ;
6765 if $call. retry {
68- let mut core_client = $client. core. clone( ) ;
66+ let mut connection = $client. core. clone( ) ;
6967 let req = $call. into_request( ) ?;
7068 $crate:: client:: rpc_resp( $client, $callback, cancel_token, async move {
71- $trait:: $call_name( & mut core_client , req) . await
69+ $trait:: $call_name( & mut connection , req) . await
7270 } )
7371 } else {
74- let mut core_client = $client. core. clone( ) . into_inner ( ) ;
72+ let connection = $client. core. clone( ) ;
7573 let req = $call. into_request( ) ?;
7674 $crate:: client:: rpc_resp( $client, $callback, cancel_token, async move {
77- $trait :: $call_name( & mut core_client , req) . await
75+ connection . $service_method ( ) . $call_name( req) . await
7876 } )
7977 }
8078 } } ;
@@ -90,109 +88,103 @@ impl Client {
9088 . child ( id ! ( "rpc_retry" ) ) ?
9189 . ok_or_else ( || error ! ( "Missing rpc_retry" ) ) ?;
9290 let tls = options. child ( id ! ( "tls" ) ) ?;
93- let opts = ClientOptions :: builder ( )
94- . target_url (
95- Url :: parse (
96- format ! (
97- "{}://{}" ,
98- if tls. is_some( ) { "https" } else { "http" } ,
99- options. member:: <String >( id!( "target_host" ) ) ?
100- )
101- . as_str ( ) ,
91+ let metrics_meter = runtime. handle . core . telemetry ( ) . get_temporal_metric_meter ( ) ;
92+ let opts = ConnectionOptions :: new (
93+ Url :: parse (
94+ format ! (
95+ "{}://{}" ,
96+ if tls. is_some( ) { "https" } else { "http" } ,
97+ options. member:: <String >( id!( "target_host" ) ) ?
10298 )
103- . map_err ( |err| error ! ( "Failed parsing host: {}" , err ) ) ? ,
99+ . as_str ( ) ,
104100 )
105- . client_name ( options. member :: < String > ( id ! ( "client_name" ) ) ?)
106- . client_version ( options. member :: < String > ( id ! ( "client_version" ) ) ?)
107- . headers ( headers. headers )
108- . binary_headers ( headers. binary_headers )
109- . maybe_api_key ( options. member :: < Option < String > > ( id ! ( "api_key" ) ) ?)
110- . identity ( options. member :: < String > ( id ! ( "identity" ) ) ?)
111- . maybe_tls_options ( if let Some ( tls) = tls {
112- Some ( TlsOptions {
113- client_tls_options : match (
114- tls. member :: < Option < RString > > ( id ! ( "client_cert" ) ) ?,
115- tls. member :: < Option < RString > > ( id ! ( "client_private_key" ) ) ?,
101+ . map_err ( |err| error ! ( "Failed parsing host: {}" , err) ) ?,
102+ )
103+ . client_name ( options. member :: < String > ( id ! ( "client_name" ) ) ?)
104+ . client_version ( options. member :: < String > ( id ! ( "client_version" ) ) ?)
105+ . headers ( headers. headers )
106+ . binary_headers ( headers. binary_headers )
107+ . maybe_api_key ( options. member :: < Option < String > > ( id ! ( "api_key" ) ) ?)
108+ . identity ( options. member :: < String > ( id ! ( "identity" ) ) ?)
109+ . maybe_tls_options ( if let Some ( tls) = tls {
110+ Some ( TlsOptions {
111+ client_tls_options : match (
112+ tls. member :: < Option < RString > > ( id ! ( "client_cert" ) ) ?,
113+ tls. member :: < Option < RString > > ( id ! ( "client_private_key" ) ) ?,
114+ ) {
115+ ( None , None ) => None ,
116+ ( Some ( client_cert) , Some ( client_private_key) ) => Some ( ClientTlsOptions {
117+ // These are unsafe because of lifetime issues, but we copy right away
118+ client_cert : unsafe { client_cert. as_slice ( ) . to_vec ( ) } ,
119+ client_private_key : unsafe { client_private_key. as_slice ( ) . to_vec ( ) } ,
120+ } ) ,
121+ _ => {
122+ return Err ( error ! (
123+ "Must have both client cert and private key or neither"
124+ ) ) ;
125+ }
126+ } ,
127+ server_root_ca_cert : tls
128+ . member :: < Option < RString > > ( id ! ( "server_root_ca_cert" ) ) ?
129+ . map ( |rstr| unsafe { rstr. as_slice ( ) . to_vec ( ) } ) ,
130+ domain : tls. member ( id ! ( "domain" ) ) ?,
131+ } )
132+ } else {
133+ None
134+ } )
135+ . retry_options ( RetryOptions {
136+ initial_interval : Duration :: from_secs_f64 ( rpc_retry. member ( id ! ( "initial_interval" ) ) ?) ,
137+ randomization_factor : rpc_retry. member ( id ! ( "randomization_factor" ) ) ?,
138+ multiplier : rpc_retry. member ( id ! ( "multiplier" ) ) ?,
139+ max_interval : Duration :: from_secs_f64 ( rpc_retry. member ( id ! ( "max_interval" ) ) ?) ,
140+ max_elapsed_time : match rpc_retry. member :: < f64 > ( id ! ( "max_elapsed_time" ) ) ? {
141+ // 0 means none
142+ 0.0 => None ,
143+ val => Some ( Duration :: from_secs_f64 ( val) ) ,
144+ } ,
145+ max_retries : rpc_retry. member ( id ! ( "max_retries" ) ) ?,
146+ } )
147+ . keep_alive (
148+ if let Some ( keep_alive) = options. child ( id ! ( "keep_alive" ) ) ? {
149+ Some ( ClientKeepAliveOptions {
150+ interval : Duration :: from_secs_f64 ( keep_alive. member ( id ! ( "interval" ) ) ?) ,
151+ timeout : Duration :: from_secs_f64 ( keep_alive. member ( id ! ( "timeout" ) ) ?) ,
152+ } )
153+ } else {
154+ None
155+ } ,
156+ )
157+ . maybe_http_connect_proxy (
158+ if let Some ( proxy) = options. child ( id ! ( "http_connect_proxy" ) ) ? {
159+ Some ( HttpConnectProxyOptions {
160+ target_addr : proxy. member ( id ! ( "target_host" ) ) ?,
161+ basic_auth : match (
162+ proxy. member :: < Option < String > > ( id ! ( "basic_auth_user" ) ) ?,
163+ proxy. member :: < Option < String > > ( id ! ( "basic_auth_pass" ) ) ?,
116164 ) {
117165 ( None , None ) => None ,
118- ( Some ( client_cert) , Some ( client_private_key) ) => Some ( ClientTlsOptions {
119- // These are unsafe because of lifetime issues, but we copy right away
120- client_cert : unsafe { client_cert. as_slice ( ) . to_vec ( ) } ,
121- client_private_key : unsafe { client_private_key. as_slice ( ) . to_vec ( ) } ,
122- } ) ,
166+ ( Some ( user) , Some ( pass) ) => Some ( ( user, pass) ) ,
123167 _ => {
124- return Err ( error ! (
125- "Must have both client cert and private key or neither"
126- ) ) ;
168+ return Err ( error ! ( "Must have both basic auth and pass or neither" ) ) ;
127169 }
128170 } ,
129- server_root_ca_cert : tls
130- . member :: < Option < RString > > ( id ! ( "server_root_ca_cert" ) ) ?
131- . map ( |rstr| unsafe { rstr. as_slice ( ) . to_vec ( ) } ) ,
132- domain : tls. member ( id ! ( "domain" ) ) ?,
133171 } )
134172 } else {
135173 None
136- } )
137- . retry_options ( RetryOptions {
138- initial_interval : Duration :: from_secs_f64 (
139- rpc_retry. member ( id ! ( "initial_interval" ) ) ?,
140- ) ,
141- randomization_factor : rpc_retry. member ( id ! ( "randomization_factor" ) ) ?,
142- multiplier : rpc_retry. member ( id ! ( "multiplier" ) ) ?,
143- max_interval : Duration :: from_secs_f64 ( rpc_retry. member ( id ! ( "max_interval" ) ) ?) ,
144- max_elapsed_time : match rpc_retry. member :: < f64 > ( id ! ( "max_elapsed_time" ) ) ? {
145- // 0 means none
146- 0.0 => None ,
147- val => Some ( Duration :: from_secs_f64 ( val) ) ,
148- } ,
149- max_retries : rpc_retry. member ( id ! ( "max_retries" ) ) ?,
150- } )
151- . keep_alive (
152- if let Some ( keep_alive) = options. child ( id ! ( "keep_alive" ) ) ? {
153- Some ( ClientKeepAliveOptions {
154- interval : Duration :: from_secs_f64 ( keep_alive. member ( id ! ( "interval" ) ) ?) ,
155- timeout : Duration :: from_secs_f64 ( keep_alive. member ( id ! ( "timeout" ) ) ?) ,
156- } )
157- } else {
158- None
159- } ,
160- )
161- . maybe_http_connect_proxy (
162- if let Some ( proxy) = options. child ( id ! ( "http_connect_proxy" ) ) ? {
163- Some ( HttpConnectProxyOptions {
164- target_addr : proxy. member ( id ! ( "target_host" ) ) ?,
165- basic_auth : match (
166- proxy. member :: < Option < String > > ( id ! ( "basic_auth_user" ) ) ?,
167- proxy. member :: < Option < String > > ( id ! ( "basic_auth_pass" ) ) ?,
168- ) {
169- ( None , None ) => None ,
170- ( Some ( user) , Some ( pass) ) => Some ( ( user, pass) ) ,
171- _ => {
172- return Err ( error ! (
173- "Must have both basic auth and pass or neither"
174- ) ) ;
175- }
176- } ,
177- } )
178- } else {
179- None
180- } ,
181- )
182- . build ( ) ;
174+ } ,
175+ )
176+ . maybe_metrics_meter ( metrics_meter)
177+ . build ( ) ;
183178
184179 // Create client
185180 let callback = AsyncCallback :: from_queue ( queue) ;
186- let core_runtime = runtime. handle . core . clone ( ) ;
187181 let runtime_handle = runtime. handle . clone ( ) ;
188182 runtime. handle . spawn (
189183 async move {
190- let core = opts
191- . connect_no_namespace ( core_runtime. telemetry ( ) . get_temporal_metric_meter ( ) )
192- . await ?;
184+ let core = Connection :: connect ( opts) . await ?;
193185 Ok ( core)
194186 } ,
195- move |ruby, result : Result < CoreClient , ClientInitError > | match result {
187+ move |ruby, result : Result < Connection , ClientConnectError > | match result {
196188 Ok ( core) => callback. push (
197189 & ruby,
198190 Client {
@@ -262,18 +254,16 @@ impl Client {
262254 let ruby = Ruby :: get ( ) . expect ( "Ruby not available" ) ;
263255 let headers = partition_grpc_headers ( & ruby, headers) ?;
264256 self . core
265- . get_client ( )
266257 . set_headers ( headers. headers )
267258 . map_err ( |err| error ! ( "Invalid headers: {}" , err) ) ?;
268259 self . core
269- . get_client ( )
270260 . set_binary_headers ( headers. binary_headers )
271261 . map_err ( |err| error ! ( "Invalid headers: {}" , err) ) ?;
272262 Ok ( ( ) )
273263 }
274264
275265 pub fn update_api_key ( & self , api_key : Option < String > ) {
276- self . core . get_client ( ) . set_api_key ( api_key) ;
266+ self . core . set_api_key ( api_key) ;
277267 }
278268}
279269
0 commit comments