@@ -5,6 +5,7 @@ import { Cancellation } from '@dolittle/sdk.resilience';
55import * as grpc from '@grpc/grpc-js' ;
66import { Observable , Subject } from 'rxjs' ;
77import { concatMap } from 'rxjs/operators' ;
8+ import { CouldNotConnectToRuntime } from './CouldNotConnectToRuntime' ;
89import { ClientStreamMethod , DuplexMethod , ServerStreamMethod , UnaryMethod } from './GrpcMethods' ;
910
1011
@@ -21,7 +22,7 @@ export function reactiveUnary<TArgument, TResponse>(client: grpc.Client, method:
2122 const metadata = new grpc . Metadata ( ) ;
2223 const call = method . call ( client , argument , metadata , { } , ( error : grpc . ServiceError | null , message ?: TResponse ) => {
2324 if ( error ) {
24- subject . error ( error ) ;
25+ subject . error ( getErrorFromGrpc ( error , client . getChannel ( ) . getTarget ( ) ) ) ;
2526 } else {
2627 subject . next ( message ) ;
2728 subject . complete ( ) ;
@@ -44,14 +45,14 @@ export function reactiveClientStream<TRequest, TResponse>(client: grpc.Client, m
4445 const metadata = new grpc . Metadata ( ) ;
4546 const stream = method . call ( client , metadata , { } , ( error : grpc . ServiceError | null , message ?: TResponse ) => {
4647 if ( error ) {
47- subject . error ( error ) ;
48+ subject . error ( getErrorFromGrpc ( error , client . getChannel ( ) . getTarget ( ) ) ) ;
4849 } else {
4950 subject . next ( message ) ;
5051 subject . complete ( ) ;
5152 }
5253 } ) ;
5354 handleCancellation ( stream , cancellation ) ;
54- handleClientRequests ( stream , requests , subject ) ;
55+ handleClientRequests ( stream , requests , subject , client . getChannel ( ) . getTarget ( ) ) ;
5556 return subject ;
5657}
5758
@@ -84,7 +85,7 @@ export function reactiveDuplex<TRequest, TResponse>(client: grpc.Client, method:
8485 const metadata = new grpc . Metadata ( ) ;
8586 const stream = method . call ( client , metadata , { } ) ;
8687 handleCancellation ( stream , cancellation ) ;
87- handleClientRequests ( stream , requests , subject ) ;
88+ handleClientRequests ( stream , requests , subject , client . getChannel ( ) . getTarget ( ) ) ;
8889 handleServerResponses ( stream , subject ) ;
8990 return subject ;
9091}
@@ -107,7 +108,7 @@ function handleCancellation(call: grpc.Call, cancellation: Cancellation) {
107108 * @param {Observable } requests The requests to write to the Runtime.
108109 * @param {Subject } subject The Subject which contains the responses from the Runtime.
109110 */
110- function handleClientRequests < TRequest , TResponse > ( stream : grpc . ClientWritableStream < TRequest > , requests : Observable < TRequest > , subject : Subject < TResponse > ) {
111+ function handleClientRequests < TRequest , TResponse > ( stream : grpc . ClientWritableStream < TRequest > , requests : Observable < TRequest > , subject : Subject < TResponse > , address : string ) {
111112 requests . pipe ( concatMap ( ( message : TRequest ) => {
112113 const subject = new Subject < void > ( ) ;
113114 stream . write ( message , undefined , ( ) => {
@@ -120,6 +121,9 @@ function handleClientRequests<TRequest, TResponse>(stream: grpc.ClientWritableSt
120121 } ,
121122 error : ( error : any ) => {
122123 stream . cancel ( ) ;
124+ if ( isGrpcError ( error ) ) {
125+ error = getErrorFromGrpc ( error , address ) ;
126+ }
123127 subject . error ( error ) ;
124128 }
125129 } ) ;
@@ -141,3 +145,15 @@ function handleServerResponses<TResponse>(stream: grpc.ClientReadableStream<TRes
141145 subject . error ( error ) ;
142146 } ) ;
143147}
148+
149+ function getErrorFromGrpc ( error : grpc . ServiceError , address : string ) {
150+ if ( error . code === grpc . status . UNAVAILABLE ) {
151+ return new CouldNotConnectToRuntime ( address ) ;
152+ } else {
153+ return error ;
154+ }
155+ }
156+
157+ function isGrpcError ( error : any ) : error is grpc . ServiceError {
158+ return error . code !== undefined ;
159+ }
0 commit comments