1- using System ;
2- using System . Collections . Concurrent ;
3- using System . Collections . Generic ;
1+ using System . Collections . Concurrent ;
42using BlockingCollectionExtensions . Structures ;
53
6- namespace BlockingCollectionExtensions
4+ namespace BlockingCollectionExtensions ;
5+
6+ /// <summary>
7+ /// Utility methods for adding data to <see cref="BlockingCollection{T}"/> instances.
8+ /// </summary>
9+ public static class AdditiveUtilities
710{
8- public static class AdditiveUtilities
11+ /// <summary>
12+ /// Transfer contents of an enumerable into a target blocking collection.
13+ /// </summary>
14+ public static void AddFromEnumerable < T > (
15+ this BlockingCollection < T > target ,
16+ IEnumerable < T > source ,
17+ bool completeAddingWhenDone = false )
918 {
10- /// <summary>
11- /// Transfer contents of a generic enumerable into a target blocking collection,
12- /// determine whether blocking collection should complete adding when enumerable has finished being added
13- /// </summary>
14- /// <typeparam name="T"></typeparam>
15- /// <param name="target"></param>
16- /// <param name="source"></param>
17- /// <param name="completeAddingWhenDone"></param>
18- public static void AddFromEnumerable < T > ( this BlockingCollection < T > target , IEnumerable < T > source ,
19- bool completeAddingWhenDone )
20- {
21- try
19+ AddFromEnumerable ( target , source , CancellationToken . None , completeAddingWhenDone ) ;
20+ }
21+
22+ /// <summary>
23+ /// Transfer contents of an enumerable into a target blocking collection with cancellation support.
24+ /// </summary>
25+ public static void AddFromEnumerable < T > (
26+ this BlockingCollection < T > target ,
27+ IEnumerable < T > source ,
28+ CancellationToken cancellationToken ,
29+ bool completeAddingWhenDone = false )
30+ {
31+ if ( target is null )
32+ {
33+ throw new ArgumentNullException ( nameof ( target ) ) ;
34+ }
35+
36+ if ( source is null )
37+ {
38+ throw new ArgumentNullException ( nameof ( source ) ) ;
39+ }
40+
41+ try
42+ {
43+ foreach ( var item in source )
2244 {
23- foreach ( var item in source )
24- {
25- target . Add ( item ) ;
26- }
45+ cancellationToken . ThrowIfCancellationRequested ( ) ;
46+ target . Add ( item , cancellationToken ) ;
2747 }
28- finally
48+ }
49+ finally
50+ {
51+ CompleteAddingIfRequested ( target , completeAddingWhenDone ) ;
52+ }
53+ }
54+
55+ /// <summary>
56+ /// Transfer contents from an async enumerable into a target blocking collection.
57+ /// </summary>
58+ public static async Task AddFromAsyncEnumerable < T > (
59+ this BlockingCollection < T > target ,
60+ IAsyncEnumerable < T > source ,
61+ CancellationToken cancellationToken = default ,
62+ bool completeAddingWhenDone = false )
63+ {
64+ if ( target is null )
65+ {
66+ throw new ArgumentNullException ( nameof ( target ) ) ;
67+ }
68+
69+ if ( source is null )
70+ {
71+ throw new ArgumentNullException ( nameof ( source ) ) ;
72+ }
73+
74+ try
75+ {
76+ await foreach ( var item in source . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) )
2977 {
30- if ( completeAddingWhenDone )
31- {
32- target . CompleteAdding ( ) ;
33- }
78+ target . Add ( item , cancellationToken ) ;
3479 }
3580 }
81+ finally
82+ {
83+ CompleteAddingIfRequested ( target , completeAddingWhenDone ) ;
84+ }
85+ }
3686
37- /// <summary>
38- /// Transfer contents arriving asynchronously in the form of an IObservable, subscribe a delegate that takes any data from the observable
39- /// and adds it to the BlockingCollection
40- /// </summary>
41- /// <typeparam name="T"></typeparam>
42- /// <param name="target"></param>
43- /// <param name="source"></param>
44- /// <param name="completeAddingWhenDone"></param>
45- /// <returns></returns>
46- public static IDisposable AddFromObservable < T > ( this BlockingCollection < T > target , IObservable < T > source ,
47- bool completeAddingWhenDone )
48- {
49- return source . Subscribe ( new DelegateBasedObserver < T >
50- (
87+ /// <summary>
88+ /// Subscribe to an observable and add received values to a blocking collection.
89+ /// </summary>
90+ public static IDisposable AddFromObservable < T > (
91+ this BlockingCollection < T > target ,
92+ IObservable < T > source ,
93+ bool completeAddingWhenDone = false )
94+ {
95+ if ( target is null )
96+ {
97+ throw new ArgumentNullException ( nameof ( target ) ) ;
98+ }
99+
100+ if ( source is null )
101+ {
102+ throw new ArgumentNullException ( nameof ( source ) ) ;
103+ }
104+
105+ return source . Subscribe (
106+ new DelegateBasedObserver < T > (
51107 target . Add ,
52- error =>
53- {
54- if ( completeAddingWhenDone ) target . CompleteAdding ( ) ;
55- } ,
56- ( ) =>
57- {
58- if ( completeAddingWhenDone ) target . CompleteAdding ( ) ;
59- }
60- ) ) ;
108+ _ => CompleteAddingIfRequested ( target , completeAddingWhenDone ) ,
109+ ( ) => CompleteAddingIfRequested ( target , completeAddingWhenDone ) ) ) ;
110+ }
111+
112+ private static void CompleteAddingIfRequested < T > ( BlockingCollection < T > target , bool completeAddingWhenDone )
113+ {
114+ if ( completeAddingWhenDone && ! target . IsAddingCompleted )
115+ {
116+ target . CompleteAdding ( ) ;
61117 }
62118 }
63- }
119+ }
0 commit comments