Skip to content

Commit 90db421

Browse files
committed
Add support for using cancellation tokens to cancel asynchronous queries. Additional tests.
1 parent 59e6c49 commit 90db421

4 files changed

Lines changed: 46 additions & 9 deletions

File tree

SQLitePCL.pretty.Async/AsyncDatabaseConnection.cs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ namespace SQLitePCL.pretty
3333
/// </summary>
3434
public static partial class DatabaseConnection
3535
{
36+
// FIXME: I picked this number fairly randomly. It would be good to do some experimentation
37+
// to determine if its a good default. The goal should be supporting cancelling of queries that are
38+
// actually blocking use of the database for a measurable period of time.
39+
private static readonly int defaultInterruptInstructionCount = 100;
40+
3641
private static IScheduler defaultScheduler = TaskPoolScheduler.Default;
3742

3843
/// <summary>
@@ -54,6 +59,13 @@ public static IScheduler DefaultScheduler
5459
}
5560
}
5661

62+
internal static IAsyncDatabaseConnection AsAsyncDatabaseConnection(this SQLiteDatabaseConnection This, IScheduler scheduler, int interruptInstructionCount)
63+
{
64+
Contract.Requires(This != null);
65+
Contract.Requires(scheduler != null);
66+
return new AsyncDatabaseConnectionImpl(This, scheduler, interruptInstructionCount);
67+
}
68+
5769
/// <summary>
5870
/// Returns an <see cref="IAsyncDatabaseConnection"/> instance that delegates database requests
5971
/// to the provided <see cref="IDatabaseConnection"/>.
@@ -66,9 +78,7 @@ public static IScheduler DefaultScheduler
6678
/// <returns>An <see cref="IAsyncDatabaseConnection"/> instance.</returns>
6779
public static IAsyncDatabaseConnection AsAsyncDatabaseConnection(this SQLiteDatabaseConnection This, IScheduler scheduler)
6880
{
69-
Contract.Requires(This != null);
70-
Contract.Requires(scheduler != null);
71-
return new AsyncDatabaseConnectionImpl(This, scheduler);
81+
return AsAsyncDatabaseConnection(This, scheduler, defaultInterruptInstructionCount);
7282
}
7383

7484
/// <summary>
@@ -82,7 +92,6 @@ public static IAsyncDatabaseConnection AsAsyncDatabaseConnection(this SQLiteData
8292
/// <returns>An <see cref="IAsyncDatabaseConnection"/> instance.</returns>
8393
public static IAsyncDatabaseConnection AsAsyncDatabaseConnection(this SQLiteDatabaseConnection This)
8494
{
85-
Contract.Requires(This != null);
8695
return AsAsyncDatabaseConnection(This, defaultScheduler);
8796
}
8897
}
@@ -520,6 +529,7 @@ internal sealed class AsyncDatabaseConnectionImpl : IAsyncDatabaseConnection
520529
{
521530
private readonly OperationsQueue queue = new OperationsQueue();
522531
private readonly IScheduler scheduler;
532+
private readonly int interruptInstructionCount;
523533

524534
private readonly SQLiteDatabaseConnection conn;
525535
private readonly IObservable<DatabaseTraceEventArgs> trace;
@@ -528,10 +538,11 @@ internal sealed class AsyncDatabaseConnectionImpl : IAsyncDatabaseConnection
528538

529539
private bool disposed = false;
530540

531-
internal AsyncDatabaseConnectionImpl(SQLiteDatabaseConnection conn, IScheduler scheduler)
541+
internal AsyncDatabaseConnectionImpl(SQLiteDatabaseConnection conn, IScheduler scheduler, int interruptInstructionCount)
532542
{
533543
this.conn = conn;
534544
this.scheduler = scheduler;
545+
this.interruptInstructionCount = interruptInstructionCount;
535546

536547
this.trace = Observable.FromEventPattern<DatabaseTraceEventArgs>(conn, "Trace").Select(e => e.EventArgs);
537548
this.profile = Observable.FromEventPattern<DatabaseProfileEventArgs>(conn, "Profile").Select(e => e.EventArgs);
@@ -632,7 +643,7 @@ public IObservable<T> Use<T>(Func<IDatabaseConnection, CancellationToken, IEnume
632643

633644
return queue.EnqueueOperation(ct =>
634645
{
635-
//this.conn.RegisterProgressHandler(100, () => ct.IsCancellationRequested);
646+
this.conn.RegisterProgressHandler(interruptInstructionCount, () => ct.IsCancellationRequested);
636647
try
637648
{
638649
ct.ThrowIfCancellationRequested();
@@ -653,7 +664,7 @@ public IObservable<T> Use<T>(Func<IDatabaseConnection, CancellationToken, IEnume
653664
}
654665
finally
655666
{
656-
//this.conn.RemoveProgressHandler();
667+
this.conn.RemoveProgressHandler();
657668
}
658669
}, scheduler, cancellationToken);
659670
});

SQLitePCL.pretty.Async/OperationsQueue.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ internal static class OperationsQueueExtensions
121121
public static Task EnqueueOperation(this OperationsQueue This, Action<CancellationToken> calculationFunc, IScheduler scheduler, CancellationToken cancellationToken)
122122
{
123123
return This.EnqueueOperation(ct =>
124-
Observable.Start(() =>
125-
calculationFunc(ct), scheduler).ToTask(ct), cancellationToken);
124+
Observable.Start(() => calculationFunc(ct), scheduler).ToTask(),
125+
cancellationToken);
126126
}
127127
}
128128
}

SQLitePCL.pretty.tests/AsyncTests/AsyncDatabaseConnectionTests.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ limitations under the License.
1919
using System.Collections.Generic;
2020
using System.Linq;
2121
using System.Reactive;
22+
using System.Reactive.Concurrency;
2223
using System.Reactive.Linq;
2324
using System.Threading;
2425
using System.Threading.Tasks;
@@ -251,6 +252,11 @@ await adb.Use(db =>
251252
Assert.DoesNotThrow(() => { var x = db.IsDatabaseReadOnly("main"); });
252253
Assert.DoesNotThrow(() => { using (var stmt = db.PrepareStatement("SELECT x FROM foo;")) { } });
253254

255+
int current;
256+
int highwater;
257+
Assert.DoesNotThrow(() => { db.Status(DatabaseConnectionStatusCode.CacheMiss, out current, out highwater, false); });
258+
Assert.DoesNotThrow(() => { db.WalCheckPoint("main"); });
259+
254260
db.Dispose();
255261

256262
Assert.Throws<ObjectDisposedException>(() => { var x = db.IsAutoCommit; });
@@ -261,7 +267,10 @@ await adb.Use(db =>
261267
Assert.Throws<ObjectDisposedException>(() => { var x = db.Statements; });
262268
Assert.Throws<ObjectDisposedException>(() => { var x = db.IsDatabaseReadOnly("main"); });
263269
Assert.Throws<ObjectDisposedException>(() => { var x = db.OpenBlob("", "", "", 0, true); });
270+
Assert.Throws<ObjectDisposedException>(() => { var x = db.GetTableColumnMetadata("", "", ""); });
264271
Assert.Throws<ObjectDisposedException>(() => db.PrepareStatement("SELECT x FROM foo;"));
272+
Assert.Throws<ObjectDisposedException>(() => { db.Status(DatabaseConnectionStatusCode.CacheMiss, out current, out highwater, false); });
273+
Assert.Throws<ObjectDisposedException>(() => { db.WalCheckPoint("main"); });
265274
});
266275

267276
await adb.Use(db =>
@@ -338,5 +347,21 @@ await adb.Query("Select ?, ?, ?", _0, _1, _2)
338347
Assert.AreEqual(result.Item3 != 0, _2);
339348
}
340349
}
350+
351+
[Test]
352+
public void TestStatementCancellation()
353+
{
354+
using (var adb = SQLite3.Open(":memory:").AsAsyncDatabaseConnection(TaskPoolScheduler.Default, 1))
355+
{
356+
var cts = new CancellationTokenSource();
357+
Assert.That(async () =>
358+
await adb.Use((db, ct) =>
359+
{
360+
cts.Cancel();
361+
db.Execute("Select 1;");
362+
Assert.Fail();
363+
}, cts.Token), Throws.TypeOf<TaskCanceledException>());
364+
}
365+
}
341366
}
342367
}

SQLitePCL.pretty.tests/AsyncTests/AsyncStatementTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ await aStmt.Use(stmt =>
4343
Assert.Throws<ObjectDisposedException>(() => { stmt.ClearBindings(); });
4444
Assert.Throws<ObjectDisposedException>(() => { stmt.MoveNext(); });
4545
Assert.Throws<ObjectDisposedException>(() => { stmt.Reset(); });
46+
Assert.Throws<ObjectDisposedException>(() => { stmt.Status(StatementStatusCode.AutoIndex, false); });
4647
});
4748

4849
await aStmt.Use(stmt =>

0 commit comments

Comments
 (0)