1- use futures_util:: future:: BoxFuture ;
1+ use futures_util:: { future:: BoxFuture , FutureExt } ;
22use std:: {
33 fmt:: Debug ,
4+ future:: Future ,
45 ops:: { Deref , DerefMut } ,
56} ;
67
7- use crate :: { Event , Payload } ;
8+ use crate :: { AckId , Event , Payload } ;
89
910use super :: client:: { Client , ReconnectSettings } ;
1011
1112/// Internal type, provides a way to store futures and return them in a boxed manner.
12- pub ( crate ) type DynAsyncCallback =
13- Box < dyn for < ' a > FnMut ( Payload , Client ) -> BoxFuture < ' static , ( ) > + ' static + Send + Sync > ;
13+ pub ( crate ) type DynAsyncCallback = Box <
14+ dyn for < ' a > FnMut ( Payload , Client , Option < AckId > ) -> BoxFuture < ' static , ( ) >
15+ + ' static
16+ + Send
17+ + Sync ,
18+ > ;
1419
1520pub ( crate ) type DynAsyncAnyCallback = Box <
16- dyn for < ' a > FnMut ( Event , Payload , Client ) -> BoxFuture < ' static , ( ) > + ' static + Send + Sync ,
21+ dyn for < ' a > FnMut ( Event , Payload , Client , Option < AckId > ) -> BoxFuture < ' static , ( ) >
22+ + ' static
23+ + Send
24+ + Sync ,
1725> ;
1826
1927pub ( crate ) type DynAsyncReconnectSettingsCallback =
@@ -30,8 +38,10 @@ impl<T> Debug for Callback<T> {
3038}
3139
3240impl Deref for Callback < DynAsyncCallback > {
33- type Target =
34- dyn for < ' a > FnMut ( Payload , Client ) -> BoxFuture < ' static , ( ) > + ' static + Sync + Send ;
41+ type Target = dyn for < ' a > FnMut ( Payload , Client , Option < AckId > ) -> BoxFuture < ' static , ( ) >
42+ + ' static
43+ + Sync
44+ + Send ;
3545
3646 fn deref ( & self ) -> & Self :: Target {
3747 self . inner . as_ref ( )
@@ -45,19 +55,34 @@ impl DerefMut for Callback<DynAsyncCallback> {
4555}
4656
4757impl Callback < DynAsyncCallback > {
48- pub ( crate ) fn new < T > ( callback : T ) -> Self
58+ pub ( crate ) fn new_with_ack < T > ( mut callback : T ) -> Self
4959 where
50- T : for < ' a > FnMut ( Payload , Client ) -> BoxFuture < ' static , ( ) > + ' static + Sync + Send ,
60+ T : for < ' a > FnMut ( Payload , Client , AckId ) -> BoxFuture < ' static , ( ) > + ' static + Sync + Send ,
5161 {
5262 Callback {
53- inner : Box :: new ( callback) ,
63+ inner : Box :: new ( move |p, c, a| match a {
64+ Some ( a) => callback ( p, c, a) . boxed ( ) ,
65+ None => std:: future:: ready ( ( ) ) . boxed ( ) ,
66+ } ) ,
67+ }
68+ }
69+
70+ pub ( crate ) fn new < T , Fut > ( mut callback : T ) -> Self
71+ where
72+ T : FnMut ( Payload , Client ) -> Fut + Sync + Send + ' static ,
73+ Fut : Future < Output = ( ) > + ' static + Send ,
74+ {
75+ Callback {
76+ inner : Box :: new ( move |p, c, _a| callback ( p, c) . boxed ( ) ) ,
5477 }
5578 }
5679}
5780
5881impl Deref for Callback < DynAsyncAnyCallback > {
59- type Target =
60- dyn for < ' a > FnMut ( Event , Payload , Client ) -> BoxFuture < ' static , ( ) > + ' static + Sync + Send ;
82+ type Target = dyn for < ' a > FnMut ( Event , Payload , Client , Option < AckId > ) -> BoxFuture < ' static , ( ) >
83+ + ' static
84+ + Sync
85+ + Send ;
6186
6287 fn deref ( & self ) -> & Self :: Target {
6388 self . inner . as_ref ( )
@@ -71,12 +96,28 @@ impl DerefMut for Callback<DynAsyncAnyCallback> {
7196}
7297
7398impl Callback < DynAsyncAnyCallback > {
74- pub ( crate ) fn new < T > ( callback : T ) -> Self
99+ pub ( crate ) fn new_with_ack < T > ( mut callback : T ) -> Self
75100 where
76- T : for < ' a > FnMut ( Event , Payload , Client ) -> BoxFuture < ' static , ( ) > + ' static + Sync + Send ,
101+ T : for < ' a > FnMut ( Event , Payload , Client , AckId ) -> BoxFuture < ' static , ( ) >
102+ + ' static
103+ + Sync
104+ + Send ,
77105 {
78106 Callback {
79- inner : Box :: new ( callback) ,
107+ inner : Box :: new ( move |e, p, c, a| match a {
108+ Some ( a) => callback ( e, p, c, a) . boxed ( ) ,
109+ None => std:: future:: ready ( ( ) ) . boxed ( ) ,
110+ } ) ,
111+ }
112+ }
113+
114+ pub ( crate ) fn new < T , Fut > ( mut callback : T ) -> Self
115+ where
116+ T : FnMut ( Event , Payload , Client ) -> Fut + Sync + Send + ' static ,
117+ Fut : Future < Output = ( ) > + ' static + Send ,
118+ {
119+ Callback {
120+ inner : Box :: new ( move |e, p, c, _a| callback ( e, p, c) . boxed ( ) ) ,
80121 }
81122 }
82123}
0 commit comments