Skip to content

Commit 183067f

Browse files
committed
Fully functional database/sql driver
Here, implement the rest of driver functionality on `riverdatabasesql`, the existing driver for Go's built-in `database/sql` package. Previously it only supported a minimal interface allowing it to run migrations, but nothing more sophisticated like inserting jobs. The benefit of a fully functional driver is that it will allow River to be integrated with with other Go database packages that aren't built around Pgx like Bun (requested in #302) and GORM (requested in #58). I'll need to write up some documentation, but this change should make both of those integrations possible immediately. It also lays the groundwork for future non-Postgres drivers. It's going to be a little more still, but I want to take a stab at SQLite, and this change will get us a lot of the way there. There's no way with `database/sql` to support listen/notify, so here we introduce the idea of a poll only driver. River's client checks whether a driver can support listen/notify on initialization, and if not, it enters poll only mode the same way as if configured with `PollOnly`. An intuitive idiosyncrasy of this set up is that even when using the `database/sql` driver bundled here, regardless of whether they're working with Bun, GORM, or whatever, users will generally still be using Pgx under the hood since it's the only maintained and fully functional Postgres driver in the Go ecosystem. With that said, the driver still has to bundle in `lib/pq` for various constructs like `pq.Array` because we're using sqlc, and sqlc's `database/sql` driver always uses `lib/pq`. I tried to find a way around this, but came out fairly convinced that there is none. To rid ourselves of `lib/pq` completely we'd need sqlc to ship an alternative Pgx driver that used Pgx internally, but exposed a `database/sql` interface using `*sql.Tx` instead of `pgx.Tx`.
1 parent 793f370 commit 183067f

24 files changed

Lines changed: 1578 additions & 524 deletions

client.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -475,12 +475,16 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
475475
client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor())
476476
client.services = append(client.services, client.completer)
477477

478-
// In poll only mode, we don't try to initialize a notifier that uses
479-
// listen/notify. Instead, each service polls for changes it's
480-
// interested in. e.g. Elector polls to see if leader has expired.
481-
if !config.PollOnly {
482-
client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
483-
client.services = append(client.services, client.notifier)
478+
if driver.SupportsListener() {
479+
// In poll only mode, we don't try to initialize a notifier that
480+
// uses listen/notify. Instead, each service polls for changes it's
481+
// interested in. e.g. Elector polls to see if leader has expired.
482+
if !config.PollOnly {
483+
client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
484+
client.services = append(client.services, client.notifier)
485+
}
486+
} else {
487+
logger.Info("Driver does not support listener; entering poll only mode")
484488
}
485489

486490
client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
@@ -1264,6 +1268,15 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*rive
12641268
}
12651269
if tags == nil {
12661270
tags = []string{}
1271+
} else {
1272+
for _, tag := range tags {
1273+
if len(tag) > 255 {
1274+
return nil, nil, errors.New("tags should be a maximum of 255 characters long")
1275+
}
1276+
if !tagRE.MatchString(tag) {
1277+
return nil, nil, errors.New("tags should match regex " + tagRE.String())
1278+
}
1279+
}
12671280
}
12681281

12691282
if priority > 4 {
@@ -1284,10 +1297,10 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*rive
12841297
}
12851298

12861299
insertParams := &riverdriver.JobInsertFastParams{
1287-
EncodedArgs: encodedArgs,
1300+
EncodedArgs: json.RawMessage(encodedArgs),
12881301
Kind: args.Kind(),
12891302
MaxAttempts: maxAttempts,
1290-
Metadata: metadata,
1303+
Metadata: json.RawMessage(metadata),
12911304
Priority: priority,
12921305
Queue: queue,
12931306
State: rivertype.JobStateAvailable,

client_test.go

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/jackc/pgx/v5"
1818
"github.com/jackc/pgx/v5/pgconn"
1919
"github.com/jackc/pgx/v5/pgxpool"
20+
"github.com/jackc/pgx/v5/stdlib"
2021
"github.com/robfig/cron/v3"
2122
"github.com/stretchr/testify/require"
2223

@@ -31,6 +32,7 @@ import (
3132
"github.com/riverqueue/river/internal/util/ptrutil"
3233
"github.com/riverqueue/river/internal/util/sliceutil"
3334
"github.com/riverqueue/river/riverdriver"
35+
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
3436
"github.com/riverqueue/river/riverdriver/riverpgxv5"
3537
"github.com/riverqueue/river/rivertype"
3638
)
@@ -158,7 +160,7 @@ func newTestClient(t *testing.T, dbPool *pgxpool.Pool, config *Config) *Client[p
158160
return client
159161
}
160162

161-
func startClient(ctx context.Context, t *testing.T, client *Client[pgx.Tx]) {
163+
func startClient[TTx any](ctx context.Context, t *testing.T, client *Client[TTx]) {
162164
t.Helper()
163165

164166
if err := client.Start(ctx); err != nil {
@@ -181,6 +183,21 @@ func runNewTestClient(ctx context.Context, t *testing.T, config *Config) *Client
181183
return client
182184
}
183185

186+
func subscribe[TTx any](t *testing.T, client *Client[TTx]) <-chan *Event {
187+
t.Helper()
188+
189+
subscribeChan, cancel := client.Subscribe(
190+
EventKindJobCancelled,
191+
EventKindJobCompleted,
192+
EventKindJobFailed,
193+
EventKindJobSnoozed,
194+
EventKindQueuePaused,
195+
EventKindQueueResumed,
196+
)
197+
t.Cleanup(cancel)
198+
return subscribeChan
199+
}
200+
184201
func Test_Client(t *testing.T) {
185202
t.Parallel()
186203

@@ -211,21 +228,6 @@ func Test_Client(t *testing.T) {
211228
return newTestClient(t, bundle.dbPool, config), bundle
212229
}
213230

214-
subscribe := func(t *testing.T, client *Client[pgx.Tx]) <-chan *Event {
215-
t.Helper()
216-
217-
subscribeChan, cancel := client.Subscribe(
218-
EventKindJobCancelled,
219-
EventKindJobCompleted,
220-
EventKindJobFailed,
221-
EventKindJobSnoozed,
222-
EventKindQueuePaused,
223-
EventKindQueueResumed,
224-
)
225-
t.Cleanup(cancel)
226-
return subscribeChan
227-
}
228-
229231
t.Run("StartInsertAndWork", func(t *testing.T) {
230232
t.Parallel()
231233

@@ -604,7 +606,40 @@ func Test_Client(t *testing.T) {
604606
}
605607
})
606608

607-
t.Run("PollOnly", func(t *testing.T) {
609+
t.Run("PollOnlyDriver", func(t *testing.T) {
610+
t.Parallel()
611+
612+
config, bundle := setupConfig(t)
613+
bundle.config.PollOnly = true
614+
615+
stdPool := stdlib.OpenDBFromPool(bundle.dbPool)
616+
t.Cleanup(func() { require.NoError(t, stdPool.Close()) })
617+
618+
client, err := NewClient(riverdatabasesql.New(stdPool), config)
619+
require.NoError(t, err)
620+
621+
client.testSignals.Init()
622+
623+
// Notifier should not have been initialized at all.
624+
require.Nil(t, client.notifier)
625+
626+
insertRes, err := client.Insert(ctx, &noOpArgs{}, nil)
627+
require.NoError(t, err)
628+
629+
subscribeChan := subscribe(t, client)
630+
startClient(ctx, t, client)
631+
632+
// Despite no notifier, the client should still be able to elect itself
633+
// leader.
634+
client.testSignals.electedLeader.WaitOrTimeout()
635+
636+
event := riverinternaltest.WaitOrTimeout(t, subscribeChan)
637+
require.Equal(t, EventKindJobCompleted, event.Kind)
638+
require.Equal(t, insertRes.Job.ID, event.Job.ID)
639+
require.Equal(t, rivertype.JobStateCompleted, event.Job.State)
640+
})
641+
642+
t.Run("PollOnlyOption", func(t *testing.T) {
608643
t.Parallel()
609644

610645
config, bundle := setupConfig(t)
@@ -4185,6 +4220,24 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
41854220
require.Equal(t, []string{"tag1", "tag2"}, insertParams.Tags)
41864221
})
41874222

4223+
t.Run("TagFormatValidated", func(t *testing.T) {
4224+
t.Parallel()
4225+
4226+
{
4227+
_, _, err := insertParamsFromArgsAndOptions(&customInsertOptsJobArgs{}, &InsertOpts{
4228+
Tags: []string{strings.Repeat("h", 256)},
4229+
})
4230+
require.EqualError(t, err, "tags should be a maximum of 255 characters long")
4231+
}
4232+
4233+
{
4234+
_, _, err := insertParamsFromArgsAndOptions(&customInsertOptsJobArgs{}, &InsertOpts{
4235+
Tags: []string{"tag,with,comma"},
4236+
})
4237+
require.EqualError(t, err, "tags should match regex "+tagRE.String())
4238+
}
4239+
})
4240+
41884241
t.Run("UniqueOpts", func(t *testing.T) {
41894242
t.Parallel()
41904243

driver_test.go

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"github.com/riverqueue/river/rivertype"
2121
)
2222

23-
func TestDriverDatabaseSQL_Executor(t *testing.T) {
23+
func TestDriverDatabaseSQL(t *testing.T) {
2424
t.Parallel()
2525

2626
ctx := context.Background()
@@ -29,42 +29,41 @@ func TestDriverDatabaseSQL_Executor(t *testing.T) {
2929
stdPool := stdlib.OpenDBFromPool(dbPool)
3030
t.Cleanup(func() { require.NoError(t, stdPool.Close()) })
3131

32-
driver := riverdatabasesql.New(nil)
33-
riverdrivertest.ExerciseExecutorMigrationOnly(ctx, t, driver, func(ctx context.Context, t *testing.T) *sql.Tx {
34-
t.Helper()
32+
riverdrivertest.Exercise(ctx, t,
33+
func(ctx context.Context, t *testing.T) riverdriver.Driver[*sql.Tx] {
34+
t.Helper()
3535

36-
tx, err := stdPool.BeginTx(ctx, nil)
37-
require.NoError(t, err)
38-
t.Cleanup(func() { _ = tx.Rollback() })
36+
return riverdatabasesql.New(stdPool)
37+
},
38+
func(ctx context.Context, t *testing.T) riverdriver.Executor {
39+
t.Helper()
3940

40-
return tx
41-
})
42-
}
43-
44-
func TestDriverRiverPgxV5_Executor(t *testing.T) {
45-
t.Parallel()
46-
47-
ctx := context.Background()
41+
tx, err := stdPool.BeginTx(ctx, nil)
42+
require.NoError(t, err)
43+
t.Cleanup(func() { _ = tx.Rollback() })
4844

49-
driver := riverpgxv5.New(nil)
50-
riverdrivertest.ExerciseExecutorFull(ctx, t, driver, func(ctx context.Context, t *testing.T) pgx.Tx {
51-
t.Helper()
52-
53-
return riverinternaltest.TestTx(ctx, t)
54-
})
45+
return riverdatabasesql.New(nil).UnwrapExecutor(tx)
46+
})
5547
}
5648

57-
func TestDriverRiverPgxV5_Listener(t *testing.T) {
49+
func TestDriverRiverPgxV5(t *testing.T) {
5850
t.Parallel()
5951

6052
ctx := context.Background()
6153

62-
riverdrivertest.ExerciseListener(ctx, t, func(ctx context.Context, t *testing.T) riverdriver.Driver[pgx.Tx] {
63-
t.Helper()
54+
riverdrivertest.Exercise(ctx, t,
55+
func(ctx context.Context, t *testing.T) riverdriver.Driver[pgx.Tx] {
56+
t.Helper()
6457

65-
dbPool := riverinternaltest.TestDB(ctx, t)
66-
return riverpgxv5.New(dbPool)
67-
})
58+
dbPool := riverinternaltest.TestDB(ctx, t)
59+
return riverpgxv5.New(dbPool)
60+
},
61+
func(ctx context.Context, t *testing.T) riverdriver.Executor {
62+
t.Helper()
63+
64+
tx := riverinternaltest.TestTx(ctx, t)
65+
return riverpgxv5.New(nil).UnwrapExecutor(tx)
66+
})
6867
}
6968

7069
func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {

insert_opts.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,21 @@ package river
33
import (
44
"errors"
55
"fmt"
6+
"regexp"
67
"slices"
78
"time"
89

910
"github.com/riverqueue/river/rivertype"
1011
)
1112

13+
// Regular expression to which the format of tags must comply. Mainly, no
14+
// special characters, and with hyphens in the middle.
15+
//
16+
// A key property here (in case this is relaxed in the future) is that commas
17+
// must never be allowed because they're used as a delimiter during batch job
18+
// insertion for the `riverdatabasesql` driver.
19+
var tagRE = regexp.MustCompile(`\A[\w][\w\-]+[\w]\z`)
20+
1221
// InsertOpts are optional settings for a new job which can be provided at job
1322
// insertion time. These will override any default InsertOpts settings provided
1423
// by JobArgsWithInsertOpts, as well as any global defaults.
@@ -58,6 +67,9 @@ type InsertOpts struct {
5867
// functional behavior and are meant entirely as a user-specified construct
5968
// to help group and categorize jobs.
6069
//
70+
// Tags should conform to the regex `\A[\w][\w\-]+[\w]\z` and be a maximum
71+
// of 255 characters long. No special characters are allowed.
72+
//
6173
// If tags are specified from both a job args override and from options on
6274
// Insert, the latter takes precedence. Tags are not merged.
6375
Tags []string

insert_opts_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,28 @@ import (
99
"github.com/riverqueue/river/rivertype"
1010
)
1111

12+
func TestTagRE(t *testing.T) {
13+
t.Parallel()
14+
15+
require.Regexp(t, tagRE, "aaa")
16+
require.Regexp(t, tagRE, "_aaa")
17+
require.Regexp(t, tagRE, "aaa_")
18+
require.Regexp(t, tagRE, "777")
19+
require.Regexp(t, tagRE, "my-tag")
20+
require.Regexp(t, tagRE, "my_tag")
21+
require.Regexp(t, tagRE, "my_longer_tag")
22+
require.Regexp(t, tagRE, "My_Capitalized_Tag")
23+
require.Regexp(t, tagRE, "ALL_CAPS")
24+
require.Regexp(t, tagRE, "1_2_3")
25+
26+
require.NotRegexp(t, tagRE, "a")
27+
require.NotRegexp(t, tagRE, "aa")
28+
require.NotRegexp(t, tagRE, "-aaa")
29+
require.NotRegexp(t, tagRE, "aaa-")
30+
require.NotRegexp(t, tagRE, "special@characters$banned")
31+
require.NotRegexp(t, tagRE, "commas,never,allowed")
32+
}
33+
1234
func TestJobUniqueOpts_validate(t *testing.T) {
1335
t.Parallel()
1436

0 commit comments

Comments
 (0)