Skip to content

Commit e6b1275

Browse files
Refactor Codex client and improve testing utilities
- Updated `CodexCoreUtilitiesTests` to use a shim path for `CodexPath` and improved stdout reading with cancellation tokens. - Added `CreateSession_SendManyQuickTurns_DoesNotDropDeltaText` test in `InMemoryCodexClientTests` to validate quick message handling. - Introduced new methods and `TurnNotification` struct in `CodexClient` for better turn notification management. - Replaced `OnDelta` and `OnCompleted` methods with `RouteTurnNotification` for simplified logic. - Enhanced `TurnTracker` to manage notifications in a thread-safe manner. - Updated binary files: `Buffaly.CodexEmbedded.Cli.dll`, `Buffaly.CodexEmbedded.Core.dll`, and `Buffaly.CodexEmbedded.Web.dll`.
1 parent 69cf4f4 commit e6b1275

6 files changed

Lines changed: 205 additions & 26 deletions

File tree

Buffaly.CodexEmbedded.Core.Tests/CodexCoreUtilitiesTests.cs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,27 @@ public async Task ProcessJsonlTransport_StartAsync_ResolvesWindowsCodexCmdShim()
9898
"@echo off\r\necho arg=%1\r\n",
9999
System.Text.Encoding.ASCII);
100100

101-
var existingPath = Environment.GetEnvironmentVariable("PATH") ?? string.Empty;
102101
var options = new CodexClientOptions
103102
{
104-
CodexPath = "codex",
105-
WorkingDirectory = root,
106-
EnvironmentVariables = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
107-
{
108-
["PATH"] = $"{root};{existingPath}"
109-
}
103+
// Use the explicit shim path so this test never launches a real codex install.
104+
CodexPath = shimPath,
105+
WorkingDirectory = root
110106
};
111107

112108
await using var transport = await ProcessJsonlTransport.StartAsync(options, CancellationToken.None);
113-
var stdoutLine = await transport.ReadStdoutLineAsync(CancellationToken.None);
109+
string? stdoutLine;
110+
using (var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
111+
{
112+
try
113+
{
114+
stdoutLine = await transport.ReadStdoutLineAsync(readCts.Token);
115+
}
116+
catch (OperationCanceledException)
117+
{
118+
Assert.Fail("Timed out waiting for cmd shim stdout.");
119+
return;
120+
}
121+
}
114122

115123
Assert.IsTrue((stdoutLine ?? string.Empty).Contains("arg=app-server", StringComparison.OrdinalIgnoreCase));
116124
}

Buffaly.CodexEmbedded.Core.Tests/InMemoryCodexClientTests.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,39 @@ public async Task CreateAttachAndSend_WorksAgainstInMemoryTransport()
4949
}
5050
}
5151

52+
[TestMethod]
53+
public async Task CreateSession_SendManyQuickTurns_DoesNotDropDeltaText()
54+
{
55+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
56+
57+
await using var transport = new InMemoryJsonlTransport();
58+
var server = new FakeAppServer(transport);
59+
var serverTask = server.RunAsync(cts.Token);
60+
61+
await using var client = await CodexClient.ConnectAsync(transport, cts.Token);
62+
var session = await client.CreateSessionAsync(new CodexSessionCreateOptions
63+
{
64+
Cwd = "C:\\tmp",
65+
Model = "fake-model"
66+
}, cts.Token);
67+
68+
for (var i = 0; i < 25; i++)
69+
{
70+
var result = await session.SendMessageAsync($"msg-{i}", cancellationToken: cts.Token);
71+
Assert.AreEqual("completed", result.Status);
72+
Assert.AreEqual(i == 0 ? "Hello from fake server." : "Second turn ok.", result.Text);
73+
}
74+
75+
cts.Cancel();
76+
try
77+
{
78+
await serverTask;
79+
}
80+
catch
81+
{
82+
}
83+
}
84+
5285
private sealed class FakeAppServer
5386
{
5487
private readonly InMemoryJsonlTransport _transport;

Buffaly.CodexEmbedded.Core/CodexClient.cs

Lines changed: 156 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ public sealed class CodexClient : IAsyncDisposable
1111
private readonly JsonRpcJsonlClient _rpc;
1212
private readonly SemaphoreSlim _initializeLock = new(1, 1);
1313
private readonly ConcurrentDictionary<string, TurnTracker> _turnsByTurnId = new(StringComparer.Ordinal);
14+
private readonly object _bufferedTurnNotificationsLock = new();
15+
private readonly Dictionary<string, List<TurnNotification>> _bufferedTurnNotificationsByTurnId = new(StringComparer.Ordinal);
1416
private readonly Func<CodexServerRequest, CancellationToken, Task<object?>>? _serverRequestHandler;
1517
private bool _initialized;
1618

@@ -202,6 +204,60 @@ await _rpc.SendRequestAsync(
202204
return null;
203205
}
204206

207+
private void RouteTurnNotification(string turnId, TurnNotification notification)
208+
{
209+
if (_turnsByTurnId.TryGetValue(turnId, out var tracker))
210+
{
211+
tracker.EnqueueNotification(notification);
212+
return;
213+
}
214+
215+
BufferTurnNotification(turnId, notification);
216+
217+
// Close the small race where a tracker is added after the first lookup.
218+
if (_turnsByTurnId.TryGetValue(turnId, out tracker))
219+
{
220+
var buffered = TakeBufferedTurnNotifications(turnId);
221+
if (buffered.Count == 0)
222+
{
223+
return;
224+
}
225+
226+
foreach (var bufferedNotification in buffered)
227+
{
228+
tracker.EnqueueNotification(bufferedNotification);
229+
}
230+
}
231+
}
232+
233+
private void BufferTurnNotification(string turnId, TurnNotification notification)
234+
{
235+
lock (_bufferedTurnNotificationsLock)
236+
{
237+
if (!_bufferedTurnNotificationsByTurnId.TryGetValue(turnId, out var notifications))
238+
{
239+
notifications = new List<TurnNotification>();
240+
_bufferedTurnNotificationsByTurnId[turnId] = notifications;
241+
}
242+
243+
notifications.Add(notification);
244+
}
245+
}
246+
247+
private IReadOnlyList<TurnNotification> TakeBufferedTurnNotifications(string turnId)
248+
{
249+
lock (_bufferedTurnNotificationsLock)
250+
{
251+
if (!_bufferedTurnNotificationsByTurnId.TryGetValue(turnId, out var notifications))
252+
{
253+
return Array.Empty<TurnNotification>();
254+
}
255+
256+
_bufferedTurnNotificationsByTurnId.Remove(turnId);
257+
return notifications;
258+
}
259+
}
260+
205261
internal async Task<CodexTurnResult> SendMessageAsync(
206262
string threadId,
207263
string text,
@@ -262,6 +318,7 @@ internal async Task<CodexTurnResult> SendMessageAsync(
262318
{
263319
throw new InvalidOperationException($"Duplicate turn id '{turnId}'.");
264320
}
321+
tracker.Prime(TakeBufferedTurnNotifications(turnId));
265322

266323
try
267324
{
@@ -290,9 +347,9 @@ private void HandleNotification(string method, JsonElement message)
290347
var turnId = JsonPath.TryGetString(paramsElement, "turnId");
291348
var threadId = JsonPath.TryGetString(paramsElement, "threadId");
292349

293-
if (!string.IsNullOrEmpty(delta) && !string.IsNullOrEmpty(turnId) && _turnsByTurnId.TryGetValue(turnId, out var tracker))
350+
if (!string.IsNullOrEmpty(delta) && !string.IsNullOrEmpty(turnId))
294351
{
295-
tracker.OnDelta(delta, threadId);
352+
RouteTurnNotification(turnId, TurnNotification.ForDelta(threadId, delta));
296353
}
297354
return;
298355
}
@@ -311,11 +368,9 @@ private void HandleNotification(string method, JsonElement message)
311368

312369
var status = JsonPath.TryGetString(paramsElement, "turn", "status") ?? "unknown";
313370
var errorMessage = JsonPath.TryGetString(paramsElement, "turn", "error", "message");
371+
var threadId = JsonPath.TryGetString(paramsElement, "threadId");
314372

315-
if (_turnsByTurnId.TryGetValue(turnId, out var tracker))
316-
{
317-
tracker.OnCompleted(status, errorMessage);
318-
}
373+
RouteTurnNotification(turnId, TurnNotification.ForCompleted(threadId, status, errorMessage));
319374

320375
return;
321376
}
@@ -391,11 +446,47 @@ public async ValueTask DisposeAsync()
391446
await _rpc.DisposeAsync();
392447
}
393448

449+
private enum TurnNotificationKind
450+
{
451+
Delta,
452+
Completed
453+
}
454+
455+
private readonly struct TurnNotification
456+
{
457+
public TurnNotificationKind Kind { get; }
458+
public string? ThreadId { get; }
459+
public string? Delta { get; }
460+
public string Status { get; }
461+
public string? ErrorMessage { get; }
462+
463+
private TurnNotification(TurnNotificationKind kind, string? threadId, string? delta, string status, string? errorMessage)
464+
{
465+
Kind = kind;
466+
ThreadId = threadId;
467+
Delta = delta;
468+
Status = status;
469+
ErrorMessage = errorMessage;
470+
}
471+
472+
public static TurnNotification ForDelta(string? threadId, string delta)
473+
{
474+
return new TurnNotification(TurnNotificationKind.Delta, threadId, delta, "unknown", errorMessage: null);
475+
}
476+
477+
public static TurnNotification ForCompleted(string? threadId, string status, string? errorMessage)
478+
{
479+
return new TurnNotification(TurnNotificationKind.Completed, threadId, delta: null, status, errorMessage);
480+
}
481+
}
482+
394483
private sealed class TurnTracker
395484
{
396485
private readonly StringBuilder _text = new();
397486
private readonly object _lock = new();
487+
private readonly List<TurnNotification> _queuedNotifications = new();
398488
private string? _lastError;
489+
private bool _isPrimed;
399490

400491
public string ThreadId { get; }
401492
public string TurnId { get; }
@@ -409,14 +500,48 @@ public TurnTracker(string threadId, string turnId, IProgress<CodexDelta>? progre
409500
Progress = progress;
410501
}
411502

412-
public void OnDelta(string delta, string? threadId)
503+
public void Prime(IReadOnlyList<TurnNotification> bufferedNotifications)
413504
{
414505
lock (_lock)
415506
{
416-
_text.Append(delta);
507+
if (_isPrimed)
508+
{
509+
return;
510+
}
511+
512+
if (bufferedNotifications.Count > 0)
513+
{
514+
if (_queuedNotifications.Count == 0)
515+
{
516+
_queuedNotifications.AddRange(bufferedNotifications);
517+
}
518+
else
519+
{
520+
_queuedNotifications.InsertRange(0, bufferedNotifications);
521+
}
522+
}
523+
524+
_isPrimed = true;
525+
foreach (var notification in _queuedNotifications)
526+
{
527+
ApplyNotificationLocked(notification);
528+
}
529+
_queuedNotifications.Clear();
417530
}
531+
}
418532

419-
Progress?.Report(new CodexDelta(threadId ?? ThreadId, TurnId, delta));
533+
public void EnqueueNotification(TurnNotification notification)
534+
{
535+
lock (_lock)
536+
{
537+
if (!_isPrimed)
538+
{
539+
_queuedNotifications.Add(notification);
540+
return;
541+
}
542+
543+
ApplyNotificationLocked(notification);
544+
}
420545
}
421546

422547
public void OnError(string message)
@@ -427,18 +552,31 @@ public void OnError(string message)
427552
}
428553
}
429554

430-
public void OnCompleted(string status, string? errorMessage)
555+
private void ApplyNotificationLocked(TurnNotification notification)
431556
{
432-
string aggregated;
433-
string? lastError;
434-
lock (_lock)
557+
switch (notification.Kind)
435558
{
436-
aggregated = _text.ToString();
437-
lastError = _lastError;
438-
}
559+
case TurnNotificationKind.Delta:
560+
{
561+
if (string.IsNullOrEmpty(notification.Delta))
562+
{
563+
return;
564+
}
439565

440-
var finalError = errorMessage ?? lastError;
441-
Completion.TrySetResult(new CodexTurnResult(ThreadId, TurnId, status, finalError, aggregated));
566+
_text.Append(notification.Delta);
567+
Progress?.Report(new CodexDelta(notification.ThreadId ?? ThreadId, TurnId, notification.Delta));
568+
return;
569+
}
570+
case TurnNotificationKind.Completed:
571+
{
572+
var aggregated = _text.ToString();
573+
var finalError = notification.ErrorMessage ?? _lastError;
574+
Completion.TrySetResult(new CodexTurnResult(ThreadId, TurnId, notification.Status, finalError, aggregated));
575+
return;
576+
}
577+
default:
578+
return;
579+
}
442580
}
443581
}
444582
}
0 Bytes
Binary file not shown.
2.5 KB
Binary file not shown.
0 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)