@@ -37,6 +37,8 @@ public List<Result> bulkInsert(Collection collection, List<Tuple2<String, Object
3737
3838 // Emit error Results as part of the stream and collect at the end
3939 // This is thread-safe and avoids synchronization overhead
40+ // Dynamic concurrency: use minimum of batch size and reasonable parallelism limit
41+ int concurrency = Math .min (documents .size (), 20 );
4042 return Flux .fromIterable (documents )
4143 .flatMap (documentToInsert -> {
4244 String k = documentToInsert .getT1 ();
@@ -45,7 +47,7 @@ public List<Result> bulkInsert(Collection collection, List<Tuple2<String, Object
4547 return reactiveCollection .insert (k , v , insertOptions )
4648 .then (Mono .<Result >empty ())
4749 .onErrorResume (error -> Mono .just (new Result (k , v , error , false )));
48- })
50+ }, concurrency )
4951 .collectList ()
5052 .block ();
5153 }
@@ -56,6 +58,8 @@ public List<Result> bulkUpsert(Collection collection, List<Tuple2<String, Object
5658
5759 // Emit error Results as part of the stream and collect at the end
5860 // This is thread-safe and avoids synchronization overhead
61+ // Dynamic concurrency: use minimum of batch size and reasonable parallelism limit
62+ int concurrency = Math .min (documents .size (), 20 );
5963 return Flux .fromIterable (documents )
6064 .flatMap (documentToInsert -> {
6165 String k = documentToInsert .getT1 ();
@@ -64,13 +68,15 @@ public List<Result> bulkUpsert(Collection collection, List<Tuple2<String, Object
6468 return reactiveCollection .upsert (k , v , upsertOptions )
6569 .then (Mono .<Result >empty ())
6670 .onErrorResume (error -> Mono .just (new Result (k , v , error , false )));
67- })
71+ }, concurrency )
6872 .collectList ()
6973 .block ();
7074 }
7175
7276 public List <Tuple2 <String , Object >> bulkGets (Collection collection , List <Tuple2 <String , Object >> documents , GetOptions getOptions ) {
7377 final ReactiveCollection reactiveCollection = collection .reactive ();
78+ // Dynamic concurrency: use minimum of batch size and reasonable parallelism limit
79+ int concurrency = Math .min (documents .size (), 20 );
7480 List <Tuple2 <String , Object >> returnValue = Flux .fromIterable (documents )
7581 .flatMap (new Function <Tuple2 <String , Object >, Publisher <Tuple2 <String , Object >>>() {
7682 public Publisher <Tuple2 <String , Object >> apply (Tuple2 <String , Object > documentToInsert ) {
@@ -86,7 +92,7 @@ public Mono<Tuple2<String, Object>> apply(Throwable error) {
8692 }
8793 });
8894 }
89- }).collectList ().block ();
95+ }, concurrency ).collectList ().block ();
9096 return returnValue ;
9197 }
9298
@@ -95,19 +101,23 @@ public List<Result> bulkDelete(Collection collection, List<String> keys, RemoveO
95101
96102 // Emit error Results as part of the stream and collect at the end
97103 // This is thread-safe and avoids synchronization overhead
104+ // Dynamic concurrency: use minimum of batch size and reasonable parallelism limit
105+ int concurrency = Math .min (keys .size (), 20 );
98106 return Flux .fromIterable (keys )
99107 .flatMap (key -> {
100108 return reactiveCollection .remove (key , removeOptions )
101109 .then (Mono .<Result >empty ())
102110 .onErrorResume (error -> Mono .just (new Result (key , null , error , false )));
103- })
111+ }, concurrency )
104112 .collectList ()
105113 .block ();
106114 }
107115
108116 public List <ConcurrentHashMap <String , Object >> bulkReplace (Collection collection , List <Tuple2 <String , Object >> documents ,
109117 ReplaceOptions replaceOptions ) {
110118 final ReactiveCollection reactiveCollection = collection .reactive ();
119+ // Dynamic concurrency: use minimum of batch size and reasonable parallelism limit
120+ int concurrency = Math .min (documents .size (), 20 );
111121 List <ConcurrentHashMap <String , Object >> returnValue = Flux .fromIterable (documents )
112122 .flatMap (new Function <Tuple2 <String , Object >, Publisher <ConcurrentHashMap <String , Object >>>() {
113123 public Publisher <ConcurrentHashMap <String , Object >> apply (Tuple2 <String , Object > documentToInsert ) {
@@ -134,13 +144,15 @@ public Mono<ConcurrentHashMap<String, Object>> apply(Throwable error) {
134144 }
135145 });
136146 }
137- }).collectList ().block ();
147+ }, concurrency ).collectList ().block ();
138148 return returnValue ;
139149 }
140150
141151 public List <ConcurrentHashMap <String , Object >> bulkTouch (Collection collection , List <String > keys , final int exp ,
142152 TouchOptions touchOptions , Duration exp_duration ) {
143153 final ReactiveCollection reactiveCollection = collection .reactive ();
154+ // Dynamic concurrency: use minimum of batch size and reasonable parallelism limit
155+ int concurrency = Math .min (keys .size (), 20 );
144156 List <ConcurrentHashMap <String , Object >> returnValue = Flux .fromIterable (keys )
145157 .flatMap (new Function <String , Publisher <ConcurrentHashMap <String , Object >>>() {
146158 public Publisher <ConcurrentHashMap <String , Object >> apply (String key ){
@@ -163,7 +175,7 @@ public Mono<ConcurrentHashMap<String, Object>> apply(Throwable error) {
163175 }
164176 }).defaultIfEmpty (returnValue );
165177 }
166- }).collectList ().block ();
178+ }, concurrency ).collectList ().block ();
167179 return returnValue ;
168180 }
169181
0 commit comments