From ce2e7304a4da835c250c6cdf7fbcb1974d57681e Mon Sep 17 00:00:00 2001 From: fortmarek Date: Tue, 2 Jun 2026 17:53:01 +0200 Subject: [PATCH 1/5] fix: bound concurrent subprocess launches to avoid fd exhaustion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each running subprocess holds open pipe file descriptors. Since #249 made the subprocess wait async, launches are no longer implicitly bounded by the cooperative thread pool, so a caller that fans out many concurrent commands can run the process out of file descriptors — surfacing as `NSPOSIXErrorDomain Code=9 "Bad file descriptor"` from `Process.run()`. This is especially easy to hit on CI, where the default soft `RLIMIT_NOFILE` is low (256 on macOS). Introduce an `AsyncResourceLimiter` (a cancellation-aware async semaphore, mirroring tuist/FileSystem) and wrap each launch in it. The limit is derived from the current soft file-descriptor limit, so it adapts when the limit is raised, and it suspends rather than blocks while waiting — preserving #249's fix for thread starvation. Verified with a stress harness: under a forced 256-fd limit, a high-concurrency mix of large-output and short commands previously threw "Bad file descriptor"; with the limiter the same load completes with zero errors. Existing tests (including the concurrency race suite) pass. --- Sources/Command/AsyncResourceLimiter.swift | 92 +++++++++ Sources/Command/CommandRunner.swift | 211 +++++++++++++-------- 2 files changed, 220 insertions(+), 83 deletions(-) create mode 100644 Sources/Command/AsyncResourceLimiter.swift diff --git a/Sources/Command/AsyncResourceLimiter.swift b/Sources/Command/AsyncResourceLimiter.swift new file mode 100644 index 0000000..711005a --- /dev/null +++ b/Sources/Command/AsyncResourceLimiter.swift @@ -0,0 +1,92 @@ +import Foundation + +actor AsyncResourceLimiter { + private struct Waiter { + let id: UUID + let continuation: CheckedContinuation + } + + private let limitProvider: @Sendable () -> Int + private var limit: Int + private var activePermits = 0 + private var waiters: [Waiter] = [] + + init(limit: Int) { + precondition(limit > 0, "AsyncResourceLimiter limit must be greater than zero.") + self.init(limitProvider: { limit }) + } + + init(limitProvider: @escaping @Sendable () -> Int) { + let limit = limitProvider() + precondition(limit > 0, "AsyncResourceLimiter limit must be greater than zero.") + self.limitProvider = limitProvider + self.limit = limit + } + + func withPermit(_ operation: @Sendable () async throws -> T) async throws -> T { + try await acquire() + + do { + try Task.checkCancellation() + let value = try await operation() + release() + return value + } catch { + release() + throw error + } + } + + private func acquire() async throws { + try Task.checkCancellation() + refreshLimit() + + if activePermits < limit, waiters.isEmpty { + activePermits += 1 + return + } + + let waiterID = UUID() + let wasGrantedPermit = await withTaskCancellationHandler { + await withCheckedContinuation { continuation in + waiters.append(Waiter(id: waiterID, continuation: continuation)) + grantAvailablePermits() + } + } onCancel: { + Task { await self.cancelWaiter(id: waiterID) } + } + + guard wasGrantedPermit else { + throw CancellationError() + } + } + + private func release() { + refreshLimit() + activePermits -= 1 + precondition(activePermits >= 0, "AsyncResourceLimiter released more permits than it acquired.") + grantAvailablePermits() + } + + private func refreshLimit() { + let refreshedLimit = limitProvider() + precondition(refreshedLimit > 0, "AsyncResourceLimiter limit must be greater than zero.") + limit = refreshedLimit + } + + private func grantAvailablePermits() { + while activePermits < limit, !waiters.isEmpty { + let waiter = waiters.removeFirst() + activePermits += 1 + waiter.continuation.resume(returning: true) + } + } + + private func cancelWaiter(id: UUID) { + guard let index = waiters.firstIndex(where: { $0.id == id }) else { return } + let waiter = waiters.remove(at: index) + waiter.continuation.resume(returning: false) + refreshLimit() + grantAvailablePermits() + } +} diff --git a/Sources/Command/CommandRunner.swift b/Sources/Command/CommandRunner.swift index c44e251..a588598 100644 --- a/Sources/Command/CommandRunner.swift +++ b/Sources/Command/CommandRunner.swift @@ -6,6 +6,12 @@ import Logging #endif import Path +#if canImport(Darwin) + import Darwin +#elseif canImport(Glibc) + import Glibc +#endif + /** `CommandRunning` is a protocol that declares the interface to run system processes. The main implementation of the protocol is `CommandRunner`. @@ -144,6 +150,43 @@ public struct CommandRunner: CommandRunning, Sendable { self.logger = logger } + /// File descriptors kept in reserve for stdio and other process-wide handles. + private static let reservedFileDescriptors = 32 + /// Approximate number of file descriptors a running subprocess holds (two pipes, plus the + /// transient pipe used to locate the executable). + private static let fileDescriptorsPerProcess = 6 + /// Upper bound on concurrent subprocesses, regardless of how high the file-descriptor limit is. + private static let maximumConcurrentProcesses = 256 + /// Used when the file-descriptor limit can't be determined. + private static let fallbackMaximumConcurrentProcesses = 16 + + /// Bounds the number of concurrently-running subprocesses so the pipe file descriptors they + /// hold can't exhaust the process's file-descriptor table. The limit is derived from the + /// current soft `RLIMIT_NOFILE`, so it adapts if the limit is raised at startup. + private static let processLimiter = AsyncResourceLimiter( + limitProvider: { systemMaximumConcurrentProcesses() } + ) + + private static func systemMaximumConcurrentProcesses() -> Int { + #if os(Windows) + return fallbackMaximumConcurrentProcesses + #else + var resourceLimit = rlimit() + #if canImport(Glibc) + let openFilesResource = Int32(RLIMIT_NOFILE.rawValue) + #else + let openFilesResource = RLIMIT_NOFILE + #endif + guard getrlimit(openFilesResource, &resourceLimit) == 0 else { + return fallbackMaximumConcurrentProcesses + } + let softLimit = Int(clamping: resourceLimit.rlim_cur) + let descriptorsAvailable = max(1, softLimit - reservedFileDescriptors) + let proportionalLimit = max(1, descriptorsAvailable / fileDescriptorsPerProcess) + return min(maximumConcurrentProcesses, proportionalLimit) + #endif + } + // swiftlint:disable:next function_body_length public func run( arguments: [String], @@ -153,114 +196,116 @@ public struct CommandRunner: CommandRunning, Sendable { AsyncThrowingStream(CommandEvent.self, bufferingPolicy: .unbounded) { continuation in Task.detached { do { - let loggerMetadata: Logger.Metadata = ["command": .string(arguments.joined(separator: " "))] - // Resolve the working directory if not passed. `getcwd` can transiently - // return an empty path under concurrent process launches, so fall back to - // letting the child inherit the process working directory instead of failing. - var workingDirectory = workingDirectory - if workingDirectory == nil { - let currentDirectoryPath = FileManager.default.currentDirectoryPath - if !currentDirectoryPath.isEmpty { - workingDirectory = try? .init(validating: currentDirectoryPath) + try await Self.processLimiter.withPermit { + let loggerMetadata: Logger.Metadata = ["command": .string(arguments.joined(separator: " "))] + // Resolve the working directory if not passed. `getcwd` can transiently + // return an empty path under concurrent process launches, so fall back to + // letting the child inherit the process working directory instead of failing. + var workingDirectory = workingDirectory + if workingDirectory == nil { + let currentDirectoryPath = FileManager.default.currentDirectoryPath + if !currentDirectoryPath.isEmpty { + workingDirectory = try? .init(validating: currentDirectoryPath) + } } - } - let collectedStdErr: ThreadSafe = ThreadSafe("") + let collectedStdErr: ThreadSafe = ThreadSafe("") - // Process - let process = Process() - let stdoutPipe = Pipe() - let stderrPipe = Pipe() + // Process + let process = Process() + let stdoutPipe = Pipe() + let stderrPipe = Pipe() - let stdoutTask = Task { - do { - for try await data in stdoutPipe.fileHandleForReading.byteStream() { - continuation.yield(.standardOutput([UInt8](data))) - if let output = String(data: data, encoding: .utf8) { - logger?.debug("\(output)", metadata: loggerMetadata) + let stdoutTask = Task { + do { + for try await data in stdoutPipe.fileHandleForReading.byteStream() { + continuation.yield(.standardOutput([UInt8](data))) + if let output = String(data: data, encoding: .utf8) { + logger?.debug("\(output)", metadata: loggerMetadata) + } } + } catch { + logger?.error("Error reading stdout: \(error)", metadata: loggerMetadata) } - } catch { - logger?.error("Error reading stdout: \(error)", metadata: loggerMetadata) } - } - let stderrTask = Task { - do { - for try await data in stderrPipe.fileHandleForReading.byteStream() { - continuation.yield(.standardError([UInt8](data))) - if let output = String(data: data, encoding: .utf8) { - collectedStdErr.mutate { $0.append(output) } - logger?.error("\(output)", metadata: loggerMetadata) + let stderrTask = Task { + do { + for try await data in stderrPipe.fileHandleForReading.byteStream() { + continuation.yield(.standardError([UInt8](data))) + if let output = String(data: data, encoding: .utf8) { + collectedStdErr.mutate { $0.append(output) } + logger?.error("\(output)", metadata: loggerMetadata) + } } + } catch { + logger?.error("Error reading stderr: \(error)", metadata: loggerMetadata) } - } catch { - logger?.error("Error reading stderr: \(error)", metadata: loggerMetadata) } - } - if let workingDirectory { - process.currentDirectoryURL = URL(fileURLWithPath: workingDirectory.pathString) - } - process.standardOutput = stdoutPipe - process.standardError = stderrPipe - process.standardInput = FileHandle.standardInput - process.environment = environment + if let workingDirectory { + process.currentDirectoryURL = URL(fileURLWithPath: workingDirectory.pathString) + } + process.standardOutput = stdoutPipe + process.standardError = stderrPipe + process.standardInput = FileHandle.standardInput + process.environment = environment - let processArguments = Array(arguments.dropFirst()) - process.arguments = processArguments + let processArguments = Array(arguments.dropFirst()) + process.arguments = processArguments - let executable = try lookupExecutable(firstArgument: arguments.first) - process.executableURL = executable + let executable = try lookupExecutable(firstArgument: arguments.first) + process.executableURL = executable - logger?.debug("Running sub-process", metadata: loggerMetadata) + logger?.debug("Running sub-process", metadata: loggerMetadata) - let threadSafeProcess = ThreadSafe(process) + let threadSafeProcess = ThreadSafe(process) - continuation.onTermination = { termination in - switch termination { - case .cancelled: - if threadSafeProcess.value.isRunning { - threadSafeProcess.value.terminate() + continuation.onTermination = { termination in + switch termination { + case .cancelled: + if threadSafeProcess.value.isRunning { + threadSafeProcess.value.terminate() + } + default: + break } - default: - break } - } - try await withCheckedThrowingContinuation { (processCompletion: CheckedContinuation) in - process.terminationHandler = { _ in - processCompletion.resume() - } - do { - try process.run() - } catch { - process.terminationHandler = nil - processCompletion.resume(throwing: error) + try await withCheckedThrowingContinuation { (processCompletion: CheckedContinuation) in + process.terminationHandler = { _ in + processCompletion.resume() + } + do { + try process.run() + } catch { + process.terminationHandler = nil + processCompletion.resume(throwing: error) + } } - } - await stdoutTask.value - await stderrTask.value + await stdoutTask.value + await stderrTask.value - try? stdoutPipe.fileHandleForReading.close() - try? stderrPipe.fileHandleForReading.close() + try? stdoutPipe.fileHandleForReading.close() + try? stderrPipe.fileHandleForReading.close() - switch process.terminationReason { - case .exit: - if process.terminationStatus != 0 { - throw CommandError.terminated( - process.terminationStatus, - stderr: collectedStdErr.value, - command: arguments - ) - } - case .uncaughtSignal: - if process.terminationStatus != 0 { - throw CommandError.signalled(process.terminationStatus, command: arguments) + switch process.terminationReason { + case .exit: + if process.terminationStatus != 0 { + throw CommandError.terminated( + process.terminationStatus, + stderr: collectedStdErr.value, + command: arguments + ) + } + case .uncaughtSignal: + if process.terminationStatus != 0 { + throw CommandError.signalled(process.terminationStatus, command: arguments) + } + @unknown default: + break } - @unknown default: - break } continuation.finish() } catch { From 3c943730df24c740429c887a3a8d14e3e7aec534 Mon Sep 17 00:00:00 2001 From: fortmarek Date: Tue, 2 Jun 2026 18:10:07 +0200 Subject: [PATCH 2/5] test: add red/green regression test for subprocess concurrency bound Adds `boundsConcurrentSubprocessLaunches`, which runs many commands through a runner with a small cap and asserts the number of subprocesses alive at once never exceeds it (observed via per-process marker files). It fails without the limiter (peak == fan-out) and passes with it. To make the bound testable deterministically, `CommandRunner` now holds an instance limiter (defaulting to the shared, fd-derived one) and gains an internal initializer that takes an explicit `maximumConcurrentProcesses`. --- Sources/Command/CommandRunner.swift | 20 ++++++-- .../CommandTests/CommandRunnerRaceTests.swift | 47 +++++++++++++++++++ 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/Sources/Command/CommandRunner.swift b/Sources/Command/CommandRunner.swift index a588598..4e429c4 100644 --- a/Sources/Command/CommandRunner.swift +++ b/Sources/Command/CommandRunner.swift @@ -146,8 +146,19 @@ public enum CommandError: Error, CustomStringConvertible, LocalizedError, Sendab public struct CommandRunner: CommandRunning, Sendable { let logger: Logger? + /// Bounds the number of concurrently-running subprocesses so the pipe file descriptors they + /// hold can't exhaust the process's file-descriptor table. + private let processLimiter: AsyncResourceLimiter + public init(logger: Logger? = nil) { self.logger = logger + processLimiter = Self.sharedProcessLimiter + } + + /// Creates a runner with a custom cap on concurrently-running subprocesses. + init(logger: Logger? = nil, maximumConcurrentProcesses: Int) { + self.logger = logger + processLimiter = AsyncResourceLimiter(limit: maximumConcurrentProcesses) } /// File descriptors kept in reserve for stdio and other process-wide handles. @@ -160,10 +171,9 @@ public struct CommandRunner: CommandRunning, Sendable { /// Used when the file-descriptor limit can't be determined. private static let fallbackMaximumConcurrentProcesses = 16 - /// Bounds the number of concurrently-running subprocesses so the pipe file descriptors they - /// hold can't exhaust the process's file-descriptor table. The limit is derived from the - /// current soft `RLIMIT_NOFILE`, so it adapts if the limit is raised at startup. - private static let processLimiter = AsyncResourceLimiter( + /// Shared limiter whose cap is derived from the current soft `RLIMIT_NOFILE`, so it adapts if + /// the limit is raised at startup. + private static let sharedProcessLimiter = AsyncResourceLimiter( limitProvider: { systemMaximumConcurrentProcesses() } ) @@ -196,7 +206,7 @@ public struct CommandRunner: CommandRunning, Sendable { AsyncThrowingStream(CommandEvent.self, bufferingPolicy: .unbounded) { continuation in Task.detached { do { - try await Self.processLimiter.withPermit { + try await processLimiter.withPermit { let loggerMetadata: Logger.Metadata = ["command": .string(arguments.joined(separator: " "))] // Resolve the working directory if not passed. `getcwd` can transiently // return an empty path under concurrent process launches, so fall back to diff --git a/Tests/CommandTests/CommandRunnerRaceTests.swift b/Tests/CommandTests/CommandRunnerRaceTests.swift index 0438903..e437246 100644 --- a/Tests/CommandTests/CommandRunnerRaceTests.swift +++ b/Tests/CommandTests/CommandRunnerRaceTests.swift @@ -1,9 +1,56 @@ +import Foundation import Mockable import Testing @testable import Command #if !os(Linux) struct CommandRunnerRaceTests { + @Test func boundsConcurrentSubprocessLaunches() async throws { + #if os(macOS) + // Each running subprocess holds open pipe file descriptors, so an unbounded fan-out + // can exhaust the process's file-descriptor table (surfacing as EBADF on launch). + // The runner must cap how many subprocesses run at once; verify that cap holds. + let limit = 4 + let commandRunner = CommandRunner(maximumConcurrentProcesses: limit) + + let directory = FileManager.default.temporaryDirectory + .appendingPathComponent("command-limiter-\(UUID().uuidString)") + try FileManager.default.createDirectory(at: directory, withIntermediateDirectories: true) + defer { try? FileManager.default.removeItem(at: directory) } + + // Each command creates a marker file while it runs and removes it on exit, so the + // number of marker files present at any instant equals the number of subprocesses + // running concurrently. + let maxObserved = ThreadSafe(0) + let sampler = Task { + while !Task.isCancelled { + let count = (try? FileManager.default.contentsOfDirectory(atPath: directory.path))?.count ?? 0 + maxObserved.mutate { $0 = max($0, count) } + try? await Task.sleep(nanoseconds: 2_000_000) + } + } + + let script = "marker=\"\(directory.path)/$$\"; touch \"$marker\"; sleep 0.2; rm -f \"$marker\"" + await withTaskGroup(of: Void.self) { group in + for _ in 0 ..< 60 { + group.addTask { + do { + for try await _ in commandRunner.run(arguments: ["/bin/sh", "-c", script]) {} + } catch {} + } + } + await group.waitForAll() + } + sampler.cancel() + + #expect(maxObserved.value > 0, "Expected to observe running subprocesses") + #expect( + maxObserved.value <= limit, + "Observed \(maxObserved.value) concurrent subprocesses, expected at most \(limit)" + ) + #endif + } + @Test func runsManyConcurrent_successfully() async throws { #if os(Linux) || os(macOS) let commandRunner = CommandRunner() From 15782a181d8be229e85b48f871e500303dbe9b70 Mon Sep 17 00:00:00 2001 From: fortmarek Date: Tue, 2 Jun 2026 21:35:38 +0200 Subject: [PATCH 3/5] fix: constrain AsyncResourceLimiter.withPermit result to Sendable withPermit returns its operation's result across the actor's isolation boundary, so under StrictConcurrency the result type must be Sendable. CI (Swift 6.1, all platforms) failed to compile with 'non-sendable result type T cannot be sent from nonisolated context'. The only caller returns Void, so the bound is free. Co-Authored-By: Claude Opus 4.8 --- Sources/Command/AsyncResourceLimiter.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/Command/AsyncResourceLimiter.swift b/Sources/Command/AsyncResourceLimiter.swift index 711005a..580104a 100644 --- a/Sources/Command/AsyncResourceLimiter.swift +++ b/Sources/Command/AsyncResourceLimiter.swift @@ -23,7 +23,7 @@ actor AsyncResourceLimiter { self.limit = limit } - func withPermit(_ operation: @Sendable () async throws -> T) async throws -> T { + func withPermit(_ operation: @Sendable () async throws -> T) async throws -> T { try await acquire() do { From c742b0f8322aa605ba42de42adc0f54387579d3f Mon Sep 17 00:00:00 2001 From: fortmarek Date: Tue, 2 Jun 2026 21:35:38 +0200 Subject: [PATCH 4/5] fix: install stream termination handler before awaiting a process permit continuation.onTermination was installed only after the producer task had acquired a permit and created the Process. A consumer that cancelled the stream while the task was still parked in AsyncResourceLimiter.withPermit waiting for a permit left the detached task uncancelled; once a permit freed up it launched an orphaned subprocess with no consumer. Install the handler at the stream-builder level, before the permit is awaited, and cancel the producer task from it. Cancellation unwinds the task whether it is waiting for a permit (via the limiter's withTaskCancellationHandler) or already running (the published process is terminated). A post-run isCancelled check closes the publish-before-run race. Adds a red/green regression test: with a single permit, a queued command cancelled while waiting must never launch a subprocess. Co-Authored-By: Claude Opus 4.8 --- Sources/Command/CommandRunner.swift | 40 +++++++++++----- .../CommandTests/CommandRunnerRaceTests.swift | 48 +++++++++++++++++++ 2 files changed, 75 insertions(+), 13 deletions(-) diff --git a/Sources/Command/CommandRunner.swift b/Sources/Command/CommandRunner.swift index 4e429c4..8ac2fe0 100644 --- a/Sources/Command/CommandRunner.swift +++ b/Sources/Command/CommandRunner.swift @@ -204,7 +204,9 @@ public struct CommandRunner: CommandRunning, Sendable { workingDirectory: Path.AbsolutePath? = nil ) -> AsyncThrowingStream { AsyncThrowingStream(CommandEvent.self, bufferingPolicy: .unbounded) { continuation in - Task.detached { + let runningProcess = ThreadSafe(nil) + + let task = Task.detached { do { try await processLimiter.withPermit { let loggerMetadata: Logger.Metadata = ["command": .string(arguments.joined(separator: " "))] @@ -269,18 +271,9 @@ public struct CommandRunner: CommandRunning, Sendable { logger?.debug("Running sub-process", metadata: loggerMetadata) - let threadSafeProcess = ThreadSafe(process) - - continuation.onTermination = { termination in - switch termination { - case .cancelled: - if threadSafeProcess.value.isRunning { - threadSafeProcess.value.terminate() - } - default: - break - } - } + // Publish the process so the stream's termination handler (installed before + // the permit was awaited) can terminate it once it exists. + runningProcess.mutate { $0 = process } try await withCheckedThrowingContinuation { (processCompletion: CheckedContinuation) in process.terminationHandler = { _ in @@ -288,6 +281,11 @@ public struct CommandRunner: CommandRunning, Sendable { } do { try process.run() + // Close the race where the stream is cancelled after the process is + // published but before it started running. + if Task.isCancelled { + process.terminate() + } } catch { process.terminationHandler = nil processCompletion.resume(throwing: error) @@ -322,6 +320,22 @@ public struct CommandRunner: CommandRunning, Sendable { continuation.finish(throwing: error) } } + + continuation.onTermination = { termination in + switch termination { + case .cancelled: + // Cancelling the producer task unwinds it whether it is still waiting for a + // permit or already running the subprocess. + task.cancel() + runningProcess.withValue { process in + if let process, process.isRunning { + process.terminate() + } + } + default: + break + } + } } } diff --git a/Tests/CommandTests/CommandRunnerRaceTests.swift b/Tests/CommandTests/CommandRunnerRaceTests.swift index e437246..b5240a1 100644 --- a/Tests/CommandTests/CommandRunnerRaceTests.swift +++ b/Tests/CommandTests/CommandRunnerRaceTests.swift @@ -51,6 +51,54 @@ import Testing #endif } + @Test func cancellingWhileWaitingForPermit_doesNotLaunchSubprocess() async throws { + #if os(macOS) + // With a single permit, a second command is parked inside the limiter waiting for the + // permit. Cancelling its stream while it waits must unwind the producer so it never + // launches a subprocess once the permit frees up. + let commandRunner = CommandRunner(maximumConcurrentProcesses: 1) + + let directory = FileManager.default.temporaryDirectory + .appendingPathComponent("command-cancel-\(UUID().uuidString)") + try FileManager.default.createDirectory(at: directory, withIntermediateDirectories: true) + defer { try? FileManager.default.removeItem(at: directory) } + + let holderMarker = directory.appendingPathComponent("holder-running") + let blockedSentinel = directory.appendingPathComponent("blocked-launched") + + // Holds the only permit: signals it is running, then stays alive long enough for us to + // enqueue and cancel a second command while the permit is taken. + let holderScript = "touch \"\(holderMarker.path)\"; sleep 2; rm -f \"\(holderMarker.path)\"" + let holder = Task { + for try await _ in commandRunner.run(arguments: ["/bin/sh", "-c", holderScript]) {} + } + + while !FileManager.default.fileExists(atPath: holderMarker.path) { + try await Task.sleep(nanoseconds: 5_000_000) + } + + // This can only start once the permit frees. If it ever launches it creates the + // sentinel; because we cancel it while it is still waiting, the sentinel must never appear. + let blockedScript = "touch \"\(blockedSentinel.path)\"" + let blocked = Task { + for try await _ in commandRunner.run(arguments: ["/bin/sh", "-c", blockedScript]) {} + } + + try await Task.sleep(nanoseconds: 100_000_000) + blocked.cancel() + _ = await blocked.result + + // Release the permit and give any incorrectly-orphaned producer a chance to launch. + _ = try? await holder.value + try await Task.sleep(nanoseconds: 300_000_000) + + #expect( + !FileManager.default.fileExists(atPath: blockedSentinel.path), + "A command cancelled while waiting for a permit must not launch a subprocess" + ) + #endif + } + @Test func runsManyConcurrent_successfully() async throws { #if os(Linux) || os(macOS) let commandRunner = CommandRunner() From 99fd71d93b6b516fac73c72160000d9c543b7cd8 Mon Sep 17 00:00:00 2001 From: fortmarek Date: Wed, 3 Jun 2026 09:28:09 +0200 Subject: [PATCH 5/5] feat: add output redirection so non-capturing runs allocate no pipes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Capturing a subprocess's output requires two pipes (~4 file descriptors), which is the dominant per-process descriptor cost. Many callers don't need the output at all, yet still paid for the pipes. Add an `OutputRedirection` option to `run`: - `.capture` (default, unchanged): pipes + `CommandEvent`s. - `.discard`: redirects stdout/stderr to the null device — no pipes, no events. - `.inherit`: the subprocess inherits the parent's stdout/stderr — no pipes. This mirrors swift-subprocess's discard/redirect output modes and lets callers that don't capture output drop their descriptor cost to zero, easing pressure on the file-descriptor table alongside the concurrency limiter. The new variant is additive; the existing `run` signatures are unchanged. --- Sources/Command/CommandRunner.swift | 115 +++++++++++++++----- Tests/CommandTests/CommandRunnerTests.swift | 26 +++++ 2 files changed, 113 insertions(+), 28 deletions(-) diff --git a/Sources/Command/CommandRunner.swift b/Sources/Command/CommandRunner.swift index 8ac2fe0..a3e09cd 100644 --- a/Sources/Command/CommandRunner.swift +++ b/Sources/Command/CommandRunner.swift @@ -12,6 +12,18 @@ import Path import Glibc #endif +/// Controls how a subprocess's standard output and error are handled. +public enum OutputRedirection: Sendable { + /// Captures standard output and error through pipes and emits them as `CommandEvent`s. + case capture + /// Discards standard output and error by redirecting them to the null device. No pipes are + /// created, so the command holds no extra file descriptors and emits no events. + case discard + /// Lets the subprocess inherit the parent process's standard output and error. No pipes are + /// created and no events are emitted. + case inherit +} + /** `CommandRunning` is a protocol that declares the interface to run system processes. The main implementation of the protocol is `CommandRunner`. @@ -33,6 +45,21 @@ public protocol CommandRunning: Sendable { environment: [String: String], workingDirectory: Path.AbsolutePath? ) -> AsyncThrowingStream + + /// Runs a command in the system, controlling how its output is handled. + /// - Parameters: + /// - arguments: The command arguments where the first argument represents the executable. + /// - environment: The environment variables that will be passed to the process running the command. + /// - workingDirectory: The directory from where the command will be executed. + /// - output: How the subprocess's standard output and error are handled. Use ``OutputRedirection/discard`` or + /// ``OutputRedirection/inherit`` to avoid allocating output pipes when the output isn't needed. + /// - Returns: An async throwing stream to subscribe to the emitted events and completion of the underlying process. + func run( + arguments: [String], + environment: [String: String], + workingDirectory: Path.AbsolutePath?, + output: OutputRedirection + ) -> AsyncThrowingStream } extension CommandRunning { @@ -197,11 +224,20 @@ public struct CommandRunner: CommandRunning, Sendable { #endif } - // swiftlint:disable:next function_body_length public func run( arguments: [String], environment: [String: String] = ProcessInfo.processInfo.environment, workingDirectory: Path.AbsolutePath? = nil + ) -> AsyncThrowingStream { + run(arguments: arguments, environment: environment, workingDirectory: workingDirectory, output: .capture) + } + + // swiftlint:disable:next function_body_length + public func run( + arguments: [String], + environment: [String: String] = ProcessInfo.processInfo.environment, + workingDirectory: Path.AbsolutePath? = nil, + output: OutputRedirection = .capture ) -> AsyncThrowingStream { AsyncThrowingStream(CommandEvent.self, bufferingPolicy: .unbounded) { continuation in let runningProcess = ThreadSafe(nil) @@ -225,41 +261,64 @@ public struct CommandRunner: CommandRunning, Sendable { // Process let process = Process() - let stdoutPipe = Pipe() - let stderrPipe = Pipe() - let stdoutTask = Task { - do { - for try await data in stdoutPipe.fileHandleForReading.byteStream() { - continuation.yield(.standardOutput([UInt8](data))) - if let output = String(data: data, encoding: .utf8) { - logger?.debug("\(output)", metadata: loggerMetadata) + // Only allocate output pipes when the caller wants to capture output. + // Discarding or inheriting avoids holding any extra file descriptors. + let stdoutPipe: Pipe? + let stderrPipe: Pipe? + let stdoutTask: Task? + let stderrTask: Task? + + switch output { + case .capture: + let outPipe = Pipe() + let errPipe = Pipe() + stdoutPipe = outPipe + stderrPipe = errPipe + process.standardOutput = outPipe + process.standardError = errPipe + stdoutTask = Task { + do { + for try await data in outPipe.fileHandleForReading.byteStream() { + continuation.yield(.standardOutput([UInt8](data))) + if let output = String(data: data, encoding: .utf8) { + logger?.debug("\(output)", metadata: loggerMetadata) + } } + } catch { + logger?.error("Error reading stdout: \(error)", metadata: loggerMetadata) } - } catch { - logger?.error("Error reading stdout: \(error)", metadata: loggerMetadata) } - } - - let stderrTask = Task { - do { - for try await data in stderrPipe.fileHandleForReading.byteStream() { - continuation.yield(.standardError([UInt8](data))) - if let output = String(data: data, encoding: .utf8) { - collectedStdErr.mutate { $0.append(output) } - logger?.error("\(output)", metadata: loggerMetadata) + stderrTask = Task { + do { + for try await data in errPipe.fileHandleForReading.byteStream() { + continuation.yield(.standardError([UInt8](data))) + if let output = String(data: data, encoding: .utf8) { + collectedStdErr.mutate { $0.append(output) } + logger?.error("\(output)", metadata: loggerMetadata) + } } + } catch { + logger?.error("Error reading stderr: \(error)", metadata: loggerMetadata) } - } catch { - logger?.error("Error reading stderr: \(error)", metadata: loggerMetadata) } + case .discard: + stdoutPipe = nil + stderrPipe = nil + stdoutTask = nil + stderrTask = nil + process.standardOutput = FileHandle.nullDevice + process.standardError = FileHandle.nullDevice + case .inherit: + stdoutPipe = nil + stderrPipe = nil + stdoutTask = nil + stderrTask = nil } if let workingDirectory { process.currentDirectoryURL = URL(fileURLWithPath: workingDirectory.pathString) } - process.standardOutput = stdoutPipe - process.standardError = stderrPipe process.standardInput = FileHandle.standardInput process.environment = environment @@ -292,11 +351,11 @@ public struct CommandRunner: CommandRunning, Sendable { } } - await stdoutTask.value - await stderrTask.value + await stdoutTask?.value + await stderrTask?.value - try? stdoutPipe.fileHandleForReading.close() - try? stderrPipe.fileHandleForReading.close() + try? stdoutPipe?.fileHandleForReading.close() + try? stderrPipe?.fileHandleForReading.close() switch process.terminationReason { case .exit: diff --git a/Tests/CommandTests/CommandRunnerTests.swift b/Tests/CommandTests/CommandRunnerTests.swift index 06e48c5..0a0b974 100644 --- a/Tests/CommandTests/CommandRunnerTests.swift +++ b/Tests/CommandTests/CommandRunnerTests.swift @@ -22,6 +22,32 @@ import Testing #endif } + @Test func discardOutput_emitsNoEventsButStillRuns() async throws { + #if !os(Windows) + let commandRunner = CommandRunner() + + // Capturing yields the command's output... + let captured = try await commandRunner.run(arguments: ["echo", "foo"], output: .capture) + .reduce(into: [String]()) { $0.append($1.string() ?? "") } + #expect(captured == ["foo\n"]) + + // ...while discarding allocates no pipes and emits nothing, yet still runs. + let discardedEventCount = try await commandRunner.run(arguments: ["echo", "foo"], output: .discard) + .reduce(into: 0) { count, _ in count += 1 } + #expect(discardedEventCount == 0) + #endif + } + + @Test func discardOutput_stillReportsNonZeroExit() async throws { + #if !os(Windows) + let commandRunner = CommandRunner() + + await #expect(throws: CommandError.self) { + for try await _ in commandRunner.run(arguments: ["/bin/sh", "-c", "exit 7"], output: .discard) {} + } + #endif + } + @Test func lookupExecutable_withAbsolutePath() throws { // Given let commandRunner = CommandRunner()