Skip to content

Commit 09fcce1

Browse files
committed
add async support for RegisterQueue in c#
1 parent a36dd50 commit 09fcce1

3 files changed

Lines changed: 48 additions & 11 deletions

File tree

dotnet/CallbackRegistry.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@ public bool TryGetCallback<T>(int id, out TaskCompletionSource<T>? tcs)
2323
tcs = typedTcs;
2424
return true;
2525
} else {
26+
#if DEBUG
2627
Console.WriteLine("Failed to get callback for id: " + id + " is of wrong type " + obj.GetType() + " expected " + typeof(TaskCompletionSource<T>));
28+
#endif
2729
}
2830
} else {
31+
#if DEBUG
2932
Console.WriteLine("Failed to get callback for id: " + id);
33+
#endif
3034
}
3135
tcs = null;
3236
return false;
@@ -39,10 +43,14 @@ public bool TryGetCallback(int id, out TaskCompletionSource? tcs)
3943
tcs = typedTcs;
4044
return true;
4145
} else {
46+
#if DEBUG
4247
Console.WriteLine("Failed to get callback for id: " + id + " is of wrong type " + obj.GetType() + " expected " + typeof(TaskCompletionSource));
48+
#endif
4349
}
4450
} else {
51+
#if DEBUG
4552
Console.WriteLine("Failed to get callback for id: " + id);
53+
#endif
4654
}
4755
tcs = null;
4856
return false;
@@ -128,10 +136,14 @@ public bool TryGetCallback<T>(int id, out Action<T>? tcs)
128136
tcs = typedTcs;
129137
return true;
130138
} else {
139+
#if DEBUG
131140
Console.WriteLine("Failed to get callback for id: " + id + " is of wrong type " + obj.GetType() + " expected " + typeof(Action<T>));
141+
#endif
132142
}
133143
} else {
144+
#if DEBUG
134145
Console.WriteLine("Failed to get callback for id: " + id);
146+
#endif
135147
}
136148
tcs = null;
137149
return false;
@@ -145,10 +157,14 @@ public bool TryGetCallback(int id, out Action? tcs)
145157
tcs = typedTcs;
146158
return true;
147159
} else {
160+
#if DEBUG
148161
Console.WriteLine("Failed to get callback for id: " + id + " is of wrong type " + obj.GetType() + " expected " + typeof(Action));
162+
#endif
149163
}
150164
} else {
165+
#if DEBUG
151166
Console.WriteLine("Failed to get callback for id: " + id);
167+
#endif
152168
}
153169
tcs = null;
154170
return false;
@@ -214,7 +230,9 @@ public bool TryGetCallback<TIn, TOut>(int id, out Func<TIn, TOut>? func)
214230
func = typedFunc;
215231
return true;
216232
}
233+
#if DEBUG
217234
Console.WriteLine($"Failed to get callback for id: {id} is of wrong type {obj.GetType()} expected {typeof(Func<TIn, TOut>)}");
235+
#endif
218236
}
219237
func = null;
220238
return false;

dotnet/Client.cs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,7 +1235,7 @@ public Client(string libPath = "")
12351235
_WatchCallbackDelegate = _WatchCallback;
12361236
_WatchEventCallbackDelegate = _WatchEventCallback;
12371237
_DropCollectionCallbackDelegate = _DropCollectionCallback;
1238-
_QueueEventCallbackDelegate = _QueueEventCallback;
1238+
_QueueEventCallbackDelegate = _QueueEventCallbackWrapper;
12391239
_ExchangeEventCallbackDelegate = _ExchangeEventCallback;
12401240
_RpcResponseCallbackDelegate = _RpcResponseCallback;
12411241
_CustomCommandCallbackDelegate = _CustomCommandCallback;
@@ -3086,7 +3086,11 @@ public void UnWatch(string watchid)
30863086
Marshal.FreeHGlobal(watchidPtr);
30873087
}
30883088
}
3089-
IntPtr _QueueEventCallback(IntPtr QueueEventWrapperptr)
3089+
private IntPtr _QueueEventCallbackWrapper(IntPtr QueueEventWrapperptr)
3090+
{
3091+
return _QueueEventCallback(QueueEventWrapperptr).GetAwaiter().GetResult();
3092+
}
3093+
private async Task<IntPtr> _QueueEventCallback(IntPtr QueueEventWrapperptr)
30903094
{
30913095
try
30923096
{
@@ -3115,6 +3119,26 @@ IntPtr _QueueEventCallback(IntPtr QueueEventWrapperptr)
31153119
{
31163120
return Marshal.StringToHGlobalAnsi(result);
31173121
}
3122+
} else if (QueueFuncRegistry.TryGetCallback<QueueEvent, Task<string>>(eventObj.request_id, out var asyncFuncHandler))
3123+
{
3124+
if (asyncFuncHandler == null)
3125+
{
3126+
return IntPtr.Zero;
3127+
}
3128+
var queueEvent = new QueueEvent
3129+
{
3130+
queuename = Marshal.PtrToStringAnsi(eventObj.queuename) ?? string.Empty,
3131+
correlation_id = Marshal.PtrToStringAnsi(eventObj.correlation_id) ?? string.Empty,
3132+
replyto = Marshal.PtrToStringAnsi(eventObj.replyto) ?? string.Empty,
3133+
routingkey = Marshal.PtrToStringAnsi(eventObj.routingkey) ?? string.Empty,
3134+
exchangename = Marshal.PtrToStringAnsi(eventObj.exchangename) ?? string.Empty,
3135+
data = Marshal.PtrToStringAnsi(eventObj.data) ?? string.Empty,
3136+
};
3137+
var result = await asyncFuncHandler(queueEvent);
3138+
if (!string.IsNullOrEmpty(result))
3139+
{
3140+
return Marshal.StringToHGlobalAnsi(result);
3141+
}
31183142
}
31193143
return IntPtr.Zero;
31203144
}
@@ -3129,7 +3153,7 @@ IntPtr _QueueEventCallback(IntPtr QueueEventWrapperptr)
31293153
}
31303154
}
31313155

3132-
public string RegisterQueue(string queuename, Func<QueueEvent, string> eventHandler)
3156+
public string RegisterQueue(string queuename, Func<QueueEvent, Task<string>> eventHandler)
31333157
{
31343158
if (eventHandler == null) throw new ArgumentNullException(nameof(eventHandler));
31353159
IntPtr queuenamePtr = Marshal.StringToHGlobalAnsi(queuename);
@@ -3172,7 +3196,7 @@ public string RegisterQueueAction(string queuename, Action<QueueEvent> eventHand
31723196
{
31733197
return RegisterQueue(queuename, (queueEvent) => {
31743198
eventHandler(queueEvent);
3175-
return string.Empty;
3199+
return Task.FromResult(string.Empty);
31763200
});
31773201
}
31783202

dotnet/Program.cs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -647,13 +647,8 @@ await client.CreateCollection("testdotnettscollection", timeseries:
647647
var queueId = client.RegisterQueue("test2queue", e =>
648648
{
649649
Console.WriteLine("Queue event received from " + e.queuename + " with data: " + e.data);
650-
// if(!string.IsNullOrEmpty(e.replyto)) {
651-
// var t = System.Threading.Tasks.Task.Run(() => {
652-
// var message = "{\"payload\": \"Bettina\"}";
653-
// _ = client.QueueMessage(message, e.replyto, striptoken: true, correlation_id: e.correlation_id);
654-
// });
655-
// }
656-
return "{\"payload\": \"Bettina\"}";
650+
//return "{\"payload\": \"Bettina\"}";
651+
return Task.FromResult("{\"payload\": \"Bettina\"}");
657652
});
658653
Console.WriteLine("Queue registered with id: " + queueId);
659654
}

0 commit comments

Comments
 (0)