11import Foundation
2+ import NIOConcurrencyHelpers
23
34package actor RefreshCodeIssuesCoordinator {
45 package struct Permit : Sendable {
@@ -20,7 +21,8 @@ package actor RefreshCodeIssuesCoordinator {
2021
2122 package nonisolated let maxPendingPerKey : Int
2223 package nonisolated let maxPendingTotal : Int
23- package nonisolated let queueWaitTimeoutNanoseconds : UInt64
24+ package nonisolated let queueWaitTimeout : Duration
25+ package nonisolated let queueWaitClock : any Clock < Duration > & Sendable
2426 private var nextWaiterID : UInt64 = 0
2527 private var busyKeys : Set < String > = [ ]
2628 private var pendingWaiterCount = 0
@@ -40,14 +42,29 @@ package actor RefreshCodeIssuesCoordinator {
4042 maxPendingPerKey: Int = 4 ,
4143 maxPendingTotal: Int = 32 ,
4244 queueWaitTimeout: TimeInterval = 30
45+ ) {
46+ self . init (
47+ maxPendingPerKey: maxPendingPerKey,
48+ maxPendingTotal: maxPendingTotal,
49+ queueWaitTimeout: Self . duration ( from: queueWaitTimeout) ,
50+ queueWaitClock: ContinuousClock ( )
51+ )
52+ }
53+
54+ package init (
55+ maxPendingPerKey: Int = 4 ,
56+ maxPendingTotal: Int = 32 ,
57+ queueWaitTimeout: Duration ,
58+ queueWaitClock: any Clock < Duration > & Sendable = ContinuousClock ( )
4359 ) {
4460 self . maxPendingPerKey = max ( 0 , maxPendingPerKey)
4561 self . maxPendingTotal = max ( 0 , maxPendingTotal)
46- self . queueWaitTimeoutNanoseconds = Self . nanoseconds ( from: queueWaitTimeout)
62+ self . queueWaitTimeout = queueWaitTimeout
63+ self . queueWaitClock = queueWaitClock
4764 }
4865
4966 package nonisolated var queueWaitTimeoutSeconds : Double {
50- Double ( queueWaitTimeoutNanoseconds ) / 1_000_000_000
67+ Self . seconds ( from : queueWaitTimeout )
5168 }
5269
5370 package func withPermit< T: Sendable > (
@@ -88,18 +105,16 @@ package actor RefreshCodeIssuesCoordinator {
88105 pendingTotal: pendingWaiterCount + 1
89106 )
90107
91- let timeoutTask = Task { [ queueWaitTimeoutNanoseconds] in
92- do {
93- try await Task . sleep ( nanoseconds: queueWaitTimeoutNanoseconds)
94- self . timeoutWaiter ( key: key, waiterID: waiterID)
95- } catch {
96- return
97- }
98- }
108+ let timeoutTaskBox = NIOLockedValueBox < Task < Void , Never > ? > ( nil )
99109
100110 return try await withTaskCancellationHandler (
101111 operation: {
102- defer { timeoutTask. cancel ( ) }
112+ defer {
113+ timeoutTaskBox. withLockedValue { task in
114+ task? . cancel ( )
115+ task = nil
116+ }
117+ }
103118
104119 return try await withCheckedThrowingContinuation { continuation in
105120 waitersByKey [ key, default: [ ] ] . append (
@@ -111,6 +126,18 @@ package actor RefreshCodeIssuesCoordinator {
111126 )
112127 pendingWaiterCount += 1
113128
129+ let timeoutTask = Task { [ queueWaitClock, queueWaitTimeout] in
130+ do {
131+ try await queueWaitClock. sleep ( for: queueWaitTimeout)
132+ self . timeoutWaiter ( key: key, waiterID: waiterID)
133+ } catch {
134+ return
135+ }
136+ }
137+ timeoutTaskBox. withLockedValue { task in
138+ task = timeoutTask
139+ }
140+
114141 if Task . isCancelled {
115142 failWaiter (
116143 key: key,
@@ -121,7 +148,10 @@ package actor RefreshCodeIssuesCoordinator {
121148 }
122149 } ,
123150 onCancel: {
124- timeoutTask. cancel ( )
151+ timeoutTaskBox. withLockedValue { task in
152+ task? . cancel ( )
153+ task = nil
154+ }
125155 Task {
126156 await self . cancelWaiter ( key: key, waiterID: waiterID)
127157 }
@@ -183,4 +213,15 @@ package actor RefreshCodeIssuesCoordinator {
183213 }
184214 return UInt64 ( nanoseconds. rounded ( . up) )
185215 }
216+
217+ private static func seconds( from duration: Duration ) -> Double {
218+ let components = duration. components
219+ return Double ( components. seconds)
220+ + ( Double ( components. attoseconds) / 1_000_000_000_000_000_000 )
221+ }
222+
223+ private static func duration( from interval: TimeInterval ) -> Duration {
224+ let clampedNanoseconds = min ( nanoseconds ( from: interval) , UInt64 ( Int64 . max) )
225+ return . nanoseconds( Int64 ( clampedNanoseconds) )
226+ }
186227}
0 commit comments