Skip to content

Commit 9a30f73

Browse files
authored
Merge pull request #49 from lynnswap:codex/fix-docsearch-pipe-ordering
refactor(proxy-runtime): replace upstream generations with slot sessions
2 parents e2a11be + 7a98124 commit 9a30f73

19 files changed

Lines changed: 1155 additions & 241 deletions

Sources/ProxyHTTPTransport/HTTPPostService.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,8 @@ package final class HTTPPostService: Sendable {
665665

666666
sessionManager.sendUpstream(
667667
prepared.transform.upstreamData,
668-
upstreamIndex: prepared.upstreamIndex
668+
upstreamIndex: prepared.upstreamIndex,
669+
ensureRunning: false
669670
)
670671
return makeImmediateLeaseResolution(
671672
.empty(status: .accepted, sessionID: sessionID),

Sources/ProxyHTTPTransport/MCPForwardingService.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ package struct MCPForwardingService: Sendable {
106106

107107
sessionManager.sendUpstream(
108108
prepared.transform.upstreamData,
109-
upstreamIndex: prepared.upstreamIndex
109+
upstreamIndex: prepared.upstreamIndex,
110+
ensureRunning: false
110111
)
111112
return StartedRequest(
112113
transform: prepared.transform,
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
import Foundation
2+
3+
package actor ManagedUpstreamSlot: UpstreamSlotControlling {
4+
private final class StartAttempt: @unchecked Sendable {
5+
let task: Task<any UpstreamSession, Error>
6+
7+
init(task: Task<any UpstreamSession, Error>) {
8+
self.task = task
9+
}
10+
}
11+
12+
private final class RunningSessionBox: @unchecked Sendable {
13+
let session: any UpstreamSession
14+
var eventTask: Task<Void, Never>?
15+
16+
init(session: any UpstreamSession) {
17+
self.session = session
18+
}
19+
}
20+
21+
package nonisolated let events: AsyncStream<UpstreamEvent>
22+
private let continuation: AsyncStream<UpstreamEvent>.Continuation
23+
private let factory: any UpstreamSessionFactory
24+
private var pendingStart: StartAttempt?
25+
private var current: RunningSessionBox?
26+
private var isShutdown = false
27+
28+
package init(
29+
factory: any UpstreamSessionFactory,
30+
startImmediately: Bool = false
31+
) {
32+
self.factory = factory
33+
34+
var streamContinuation: AsyncStream<UpstreamEvent>.Continuation!
35+
self.events = AsyncStream { continuation in
36+
streamContinuation = continuation
37+
}
38+
self.continuation = streamContinuation
39+
40+
if startImmediately {
41+
Task { [weak self] in
42+
await self?.start()
43+
}
44+
}
45+
}
46+
47+
package func start() async {
48+
beginStartIfNeeded()
49+
}
50+
51+
package func stop() async {
52+
isShutdown = true
53+
54+
let running = current
55+
current = nil
56+
57+
let pending = pendingStart
58+
pendingStart = nil
59+
pending?.task.cancel()
60+
61+
continuation.finish()
62+
63+
if let running {
64+
await running.session.stop()
65+
}
66+
}
67+
68+
package func send(_ data: Data) async -> UpstreamSendResult {
69+
guard !isShutdown else {
70+
return .overloaded
71+
}
72+
73+
if let current {
74+
return await current.session.send(data)
75+
}
76+
77+
guard let pendingStart else {
78+
return .overloaded
79+
}
80+
81+
do {
82+
let session = try await pendingStart.task.value
83+
guard let running = await claimStartedSessionIfNeeded(
84+
session: session,
85+
attempt: pendingStart
86+
) else {
87+
return .overloaded
88+
}
89+
return await running.session.send(data)
90+
} catch {
91+
return .overloaded
92+
}
93+
}
94+
95+
private func beginStartIfNeeded() {
96+
guard !isShutdown, current == nil, pendingStart == nil else {
97+
return
98+
}
99+
100+
let attempt = StartAttempt(
101+
task: Task {
102+
try await factory.startSession()
103+
}
104+
)
105+
pendingStart = attempt
106+
107+
Task { [weak self, attempt] in
108+
await self?.finishStartAttempt(attempt)
109+
}
110+
}
111+
112+
private func finishStartAttempt(_ attempt: StartAttempt) async {
113+
do {
114+
let session = try await attempt.task.value
115+
_ = await claimStartedSessionIfNeeded(session: session, attempt: attempt)
116+
} catch {
117+
guard pendingStart === attempt else {
118+
return
119+
}
120+
pendingStart = nil
121+
}
122+
}
123+
124+
private func claimStartedSessionIfNeeded(
125+
session: any UpstreamSession,
126+
attempt: StartAttempt
127+
) async -> RunningSessionBox? {
128+
if let current {
129+
return current.session === session ? current : nil
130+
}
131+
132+
guard pendingStart === attempt else {
133+
await session.stop()
134+
return nil
135+
}
136+
pendingStart = nil
137+
138+
guard !isShutdown else {
139+
await session.stop()
140+
return nil
141+
}
142+
143+
let running = RunningSessionBox(session: session)
144+
current = running
145+
running.eventTask = Task { [weak self, running] in
146+
for await event in session.events {
147+
await self?.handleSessionEvent(event, from: running)
148+
}
149+
await self?.handleSessionStreamFinished(from: running)
150+
}
151+
return running
152+
}
153+
154+
private func handleSessionEvent(
155+
_ event: UpstreamEvent,
156+
from running: RunningSessionBox
157+
) {
158+
guard current === running else {
159+
return
160+
}
161+
162+
continuation.yield(event)
163+
164+
switch event {
165+
case .stdoutProtocolViolation, .exit:
166+
current = nil
167+
case .message, .stderr, .stdoutBufferSize:
168+
break
169+
}
170+
}
171+
172+
private func handleSessionStreamFinished(from running: RunningSessionBox) {
173+
guard current === running else {
174+
return
175+
}
176+
current = nil
177+
}
178+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import Dispatch
2+
import Foundation
3+
import NIOConcurrencyHelpers
4+
5+
package final class OrderedPipeReader: @unchecked Sendable {
6+
package nonisolated let chunks: AsyncStream<Data>
7+
private let continuation: AsyncStream<Data>.Continuation
8+
private let fileHandle: FileHandle
9+
private let queue: DispatchQueue
10+
private let state = NIOLockedValueBox(State())
11+
12+
private struct State: Sendable {
13+
var isStarted = false
14+
var isFinished = false
15+
var source: DispatchSourceRead?
16+
}
17+
18+
package init(
19+
fileHandle: FileHandle,
20+
label: String
21+
) {
22+
self.fileHandle = fileHandle
23+
self.queue = DispatchQueue(label: label)
24+
25+
var streamContinuation: AsyncStream<Data>.Continuation!
26+
self.chunks = AsyncStream { continuation in
27+
streamContinuation = continuation
28+
}
29+
self.continuation = streamContinuation
30+
}
31+
32+
package func start() {
33+
let source = state.withLockedValue { state -> DispatchSourceRead? in
34+
guard !state.isStarted, !state.isFinished else {
35+
return nil
36+
}
37+
state.isStarted = true
38+
let source = DispatchSource.makeReadSource(
39+
fileDescriptor: fileHandle.fileDescriptor,
40+
queue: queue
41+
)
42+
state.source = source
43+
return source
44+
}
45+
guard let source else { return }
46+
47+
source.setEventHandler { [weak self] in
48+
self?.consumeAvailableData()
49+
}
50+
source.setCancelHandler { [fileHandle] in
51+
try? fileHandle.close()
52+
}
53+
source.resume()
54+
}
55+
56+
package func stop() {
57+
finish()
58+
}
59+
60+
private func consumeAvailableData() {
61+
guard state.withLockedValue({ !$0.isFinished }) else {
62+
return
63+
}
64+
65+
let chunk = fileHandle.availableData
66+
if chunk.isEmpty {
67+
finish()
68+
return
69+
}
70+
71+
guard state.withLockedValue({ !$0.isFinished }) else {
72+
return
73+
}
74+
continuation.yield(chunk)
75+
}
76+
77+
private func finish() {
78+
let result = state.withLockedValue { state -> (shouldFinish: Bool, source: DispatchSourceRead?) in
79+
guard !state.isFinished else {
80+
return (false, nil)
81+
}
82+
state.isFinished = true
83+
let source = state.source
84+
state.source = nil
85+
return (true, source)
86+
}
87+
guard result.shouldFinish else { return }
88+
continuation.finish()
89+
if let source = result.source {
90+
source.cancel()
91+
} else {
92+
try? fileHandle.close()
93+
}
94+
}
95+
}

Sources/ProxyRuntime/ProcessRunner.swift

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,25 @@ final class DispatchGroupLeaveGuard: @unchecked Sendable {
2222
}
2323

2424
private final class PipeCollector: @unchecked Sendable {
25-
private let fileHandle: FileHandle
25+
private let reader: OrderedPipeReader
2626
private let drainGuard: DispatchGroupLeaveGuard
2727
private let buffer = NIOLockedValueBox(Data())
28+
private var task: Task<Void, Never>?
2829

29-
init(fileHandle: FileHandle, drainGroup: DispatchGroup) {
30-
self.fileHandle = fileHandle
30+
init(fileHandle: FileHandle, drainGroup: DispatchGroup, label: String) {
31+
self.reader = OrderedPipeReader(fileHandle: fileHandle, label: label)
3132
self.drainGuard = DispatchGroupLeaveGuard(group: drainGroup)
3233
}
3334

3435
func start() {
35-
fileHandle.readabilityHandler = { [buffer, drainGuard] handle in
36-
let chunk = handle.availableData
37-
if chunk.isEmpty {
38-
handle.readabilityHandler = nil
39-
try? handle.close()
40-
drainGuard.leaveIfNeeded()
41-
return
42-
}
43-
buffer.withLockedValue { data in
44-
data.append(chunk)
36+
reader.start()
37+
task = Task { [buffer, drainGuard, reader] in
38+
for await chunk in reader.chunks {
39+
buffer.withLockedValue { data in
40+
data.append(chunk)
41+
}
4542
}
43+
drainGuard.leaveIfNeeded()
4644
}
4745
}
4846

@@ -51,8 +49,9 @@ private final class PipeCollector: @unchecked Sendable {
5149
}
5250

5351
func cancel() {
54-
fileHandle.readabilityHandler = nil
55-
try? fileHandle.close()
52+
reader.stop()
53+
task?.cancel()
54+
task = nil
5655
drainGuard.leaveIfNeeded()
5756
}
5857
}
@@ -99,11 +98,13 @@ package struct ProcessRunner: ProcessRunning {
9998
let drainGroup = DispatchGroup()
10099
let stdoutCollector = PipeCollector(
101100
fileHandle: stdoutPipe.fileHandleForReading,
102-
drainGroup: drainGroup
101+
drainGroup: drainGroup,
102+
label: "XcodeMCPProxy.ProcessRunner.stdout"
103103
)
104104
let stderrCollector = PipeCollector(
105105
fileHandle: stderrPipe.fileHandleForReading,
106-
drainGroup: drainGroup
106+
drainGroup: drainGroup,
107+
label: "XcodeMCPProxy.ProcessRunner.stderr"
107108
)
108109
let didResume = NIOLockedValueBox(false)
109110
let resumeOnce: @Sendable (Result<ProcessOutput, Error>) -> Void = { result in

Sources/ProxyRuntime/ResponseCorrelationStore.swift

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ extension RuntimeCoordinator {
77
config: ProxyConfig,
88
sharedSessionID: String?,
99
count: Int
10-
) -> [UpstreamProcess] {
10+
) -> [ManagedUpstreamSlot] {
1111
var environment = ProcessInfo.processInfo.environment
1212
environment.removeValue(forKey: "XCODE_PID")
1313
if let sharedSessionID, !sharedSessionID.isEmpty {
@@ -30,12 +30,14 @@ extension RuntimeCoordinator {
3030
}()
3131
)
3232
if count <= 1 {
33-
return [UpstreamProcess(config: upstreamConfig)]
33+
return [ManagedUpstreamSlot(factory: UpstreamProcess(config: upstreamConfig), startImmediately: true)]
3434
}
35-
var upstreams: [UpstreamProcess] = []
35+
var upstreams: [ManagedUpstreamSlot] = []
3636
upstreams.reserveCapacity(count)
3737
for _ in 0..<count {
38-
upstreams.append(UpstreamProcess(config: upstreamConfig))
38+
upstreams.append(
39+
ManagedUpstreamSlot(factory: UpstreamProcess(config: upstreamConfig), startImmediately: true)
40+
)
3941
}
4042
return upstreams
4143
}

Sources/ProxyRuntime/RuntimeCoordinator+Health.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ extension RuntimeCoordinator {
264264

265265
let request = makeInternalInitializeRequest(id: upstreamID)
266266
if let data = try? JSONSerialization.data(withJSONObject: request, options: []) {
267-
sendUpstream(data, upstreamIndex: upstreamIndex)
267+
sendUpstream(data, upstreamIndex: upstreamIndex, ensureRunning: true)
268268
} else {
269269
clearUpstreamState(upstreamIndex: upstreamIndex)
270270
}

0 commit comments

Comments
 (0)