44
55import { StreamId , PartitionId , ScopeId } from '@dolittle/sdk.events' ;
66import { MicroserviceId , TenantId } from '@dolittle/sdk.execution' ;
7+ import { Observable } from 'rxjs' ;
8+ import { filter } from 'rxjs/operators' ;
79
810import { Subscription } from './Subscription' ;
9- import { SubscriptionCallbacks , SubscriptionCompleted , SubscriptionFailed , SubscriptionSucceeded } from './SubscriptionCallbacks' ;
11+ import { SubscriptionCallbackArguments , SubscriptionCallbacks , SubscriptionCompleted , SubscriptionFailed , SubscriptionSucceeded } from './SubscriptionCallbacks' ;
1012
1113/**
1214 * Represents the builder for building subscriptions on a tenant.
1315 */
1416export class SubscriptionBuilderForConsumerScope {
17+ private readonly _callbacks : ( ( subscriptionCallback : SubscriptionCallbacks ) => void ) [ ] = [ ] ;
1518
16- private readonly _callbacks : SubscriptionCallbacks = new SubscriptionCallbacks ( ) ;
1719 /**
1820 * Initializes a new instance of {@link SubscriptionBuilderForConsumerScope}.
1921 * @param {MicroserviceId } _producerMicroserviceId The microservice the subscriptions are for.
20- * @param {Observable<SubscriptionCallbackArguments> } responsesSource The source of responses.
2122 */
2223 constructor (
2324 private readonly _producerMicroserviceId : MicroserviceId ,
@@ -34,7 +35,7 @@ export class SubscriptionBuilderForConsumerScope {
3435 * @summary The callback will be called on each subscription.
3536 */
3637 onCompleted ( completed : SubscriptionCompleted ) : SubscriptionBuilderForConsumerScope {
37- this . _callbacks . onCompleted ( completed ) ;
38+ this . _callbacks . push ( subscriptionCallbacks => subscriptionCallbacks . onCompleted ( completed ) ) ;
3839 return this ;
3940 }
4041
@@ -45,7 +46,7 @@ export class SubscriptionBuilderForConsumerScope {
4546 * @summary The callback will be called on each subscription.
4647 */
4748 onSuccess ( succeeded : SubscriptionSucceeded ) : SubscriptionBuilderForConsumerScope {
48- this . _callbacks . onSucceeded ( succeeded ) ;
49+ this . _callbacks . push ( subscriptionCallbacks => subscriptionCallbacks . onSucceeded ( succeeded ) ) ;
4950 return this ;
5051 }
5152
@@ -56,7 +57,7 @@ export class SubscriptionBuilderForConsumerScope {
5657 * @summary The callback will be called on each subscription.
5758 */
5859 onFailure ( failed : SubscriptionFailed ) : SubscriptionBuilderForConsumerScope {
59- this . _callbacks . onFailed ( failed ) ;
60+ this . _callbacks . push ( subscriptionCallbacks => subscriptionCallbacks . onFailed ( failed ) ) ;
6061 return this ;
6162 }
6263
@@ -65,7 +66,23 @@ export class SubscriptionBuilderForConsumerScope {
6566 * @param {Observable<SubscriptionCallbackArguments } callbackArgumentsSource The observable source of responses.
6667 * @returns {Subscription }
6768 */
68- build ( ) : Subscription {
69- return new Subscription ( this . _consumerScopeId , this . _producerMicroserviceId , this . _producerTenantId , this . _producerStreamId , this . _producerPartitionId , this . _callbacks ) ;
69+ build ( callbackArgumentsSource : Observable < SubscriptionCallbackArguments > ) : Subscription {
70+ const subscriptionCallbacks = new SubscriptionCallbacks (
71+ callbackArgumentsSource . pipe ( filter ( _ =>
72+ _ . subscription . microservice . equals ( this . _producerMicroserviceId ) &&
73+ _ . subscription . partition . equals ( this . _producerPartitionId ) &&
74+ _ . subscription . scope . equals ( this . _consumerScopeId ) &&
75+ _ . subscription . stream . equals ( this . _producerStreamId ) &&
76+ _ . subscription . tenant . equals ( this . _producerTenantId ) ) ) ) ;
77+ for ( const callback of this . _callbacks ) {
78+ callback ( subscriptionCallbacks ) ;
79+ }
80+ return new Subscription (
81+ this . _consumerScopeId ,
82+ this . _producerMicroserviceId ,
83+ this . _producerTenantId ,
84+ this . _producerStreamId ,
85+ this . _producerPartitionId ,
86+ subscriptionCallbacks ) ;
7087 }
7188}
0 commit comments