Skip to content

Commit 8622b1e

Browse files
committed
Fixed reclaim not happening
1 parent fcafe2c commit 8622b1e

2 files changed

Lines changed: 95 additions & 23 deletions

File tree

Sources/Otter/ConnectionPool.swift

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,11 @@ public actor ConnectionPool: Sendable {
7272
}
7373

7474
/// Gives the connection back to the pool.
75-
private func reclaim(tx: borrowing Transaction) async {
76-
availableConnections.append(tx.connection)
75+
private func reclaim(connection: SQLiteConnection, kind: Transaction.Kind) async {
76+
availableConnections.append(connection)
7777
alertAnyWaitersOfAvailableConnection()
7878

79-
if tx.kind == .write {
79+
if kind == .write {
8080
await writeLock.unlock()
8181
}
8282
}
@@ -111,6 +111,7 @@ public actor ConnectionPool: Sendable {
111111
/// and we need to alert anybody waiting for one.
112112
private func alertAnyWaitersOfAvailableConnection() {
113113
guard !waitingForConnection.isEmpty, !availableConnections.isEmpty else { return }
114+
// I think its handing a connection off to a cancelled task?
114115
let waiter = waitingForConnection.removeFirst()
115116
let connection = availableConnections.removeFirst()
116117
waiter.resume(with: .success(connection))
@@ -127,32 +128,51 @@ extension ConnectionPool: Connection {
127128
}
128129

129130
/// Starts a transaction.
130-
public func begin<Output>(
131+
public func begin<Output: Sendable>(
131132
_ kind: Transaction.Kind,
132133
execute: (borrowing Transaction) throws -> Output
133134
) async throws -> Output {
134-
let tx = try await begin(kind)
135-
136-
// The `Result` wrapper seems weird, but allows us to keep
137-
// tx functions consuming. Cause we cannot call `commit` in
138-
// the `do` and on failure call `rollback` since it would
139-
// have been consumed in the `commit`.
140-
//
141-
// Keeping them is consuming is nice since it stops callers
142-
// from calling `commit` manually since its borrowed
143-
let result = Result {
144-
try execute(tx)
135+
try await beginNoCommit(kind) { tx in
136+
// The `Result` wrapper seems weird, but allows us to keep
137+
// tx functions consuming. Cause we cannot call `commit` in
138+
// the `do` and on failure call `rollback` since it would
139+
// have been consumed in the `commit`.
140+
//
141+
// Keeping them is consuming is nice since it stops callers
142+
// from calling `commit` manually since its borrowed
143+
let result = Result {
144+
try Task.checkCancellation()
145+
return try execute(tx)
146+
}
147+
148+
switch result {
149+
case let .success(output):
150+
try tx.commit()
151+
observer.didCommit()
152+
return output
153+
case let .failure(error):
154+
try tx.commitOrRollback()
155+
throw error
156+
}
145157
}
158+
}
159+
160+
/// Starts a transaction for the lifetime of the closure
161+
/// and does not commit or rollback automatically. Just
162+
/// makes sure it reclaims the connection.
163+
public func beginNoCommit<Output: Sendable>(
164+
_ kind: Transaction.Kind,
165+
execute: (consuming Transaction) async throws -> Output
166+
) async throws -> Output {
167+
let tx = try await begin(kind)
168+
let conn = tx.connection
146169

147-
await reclaim(tx: tx)
148-
149-
switch result {
150-
case let .success(output):
151-
try tx.commit()
152-
observer.didCommit()
170+
do {
171+
let output = try await execute(tx)
172+
await reclaim(connection: conn, kind: kind)
153173
return output
154-
case let .failure(error):
155-
try tx.commitOrRollback()
174+
} catch {
175+
await reclaim(connection: conn, kind: kind)
156176
throw error
157177
}
158178
}

Sources/Otter/Queries/Bulk.swift

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
//
2+
// Bulk.swift
3+
// Otter
4+
//
5+
// Created by Wes Wickwire on 7/4/25.
6+
//
7+
8+
extension Queries {
9+
public struct Bulk<Base: DatabaseQuery>: DatabaseQuery {
10+
public typealias Input = [Base.Input]
11+
public typealias Output = [Base.Output]
12+
13+
let base: Base
14+
15+
public var transactionKind: Transaction.Kind {
16+
base.transactionKind
17+
}
18+
19+
public var connection: any Connection {
20+
base.connection
21+
}
22+
23+
public var watchedTables: Set<String> {
24+
base.watchedTables
25+
}
26+
27+
public func execute(
28+
with input: [Base.Input],
29+
tx: borrowing Transaction
30+
) throws -> [Base.Output] {
31+
var results: [Base.Output] = []
32+
33+
for input in input {
34+
try results.append(base.execute(with: input, tx: tx))
35+
}
36+
37+
return results
38+
}
39+
}
40+
}
41+
42+
extension DatabaseQuery {
43+
/// Returns a query that executes the same query in bulk
44+
/// for each input.
45+
///
46+
/// Note: The individual statements are still executed one
47+
/// at a time but is done in a single transaction so the
48+
/// write to disk only happens once.
49+
public func bulk() -> Queries.Bulk<Self> {
50+
Queries.Bulk(base: self)
51+
}
52+
}

0 commit comments

Comments
 (0)