Skip to content

Commit 4d4c914

Browse files
committed
Fully functional database/sql driver
Here, implement the rest of driver functionality on `riverdatabasesql`, the existing driver for Go's built-in `database/sql` package. Previously it only supported a minimal interface allowing it to run migrations, but nothing more sophisticated like inserting jobs. The benefit of a fully functional driver is that it will allow River to be integrated with with other Go database packages that aren't built around Pgx like Bun (requested in #302) and GORM (requested in #58). I'll need to write up some documentation, but this change should make both of those integrations possible immediately. It also lays the groundwork for future non-Postgres drivers. It's going to be a little more still, but I want to take a stab at SQLite, and this change will get us a lot of the way there. There's no way with `database/sql` to support listen/notify, so here we introduce the idea of a poll only driver. River's client checks whether a driver can support listen/notify on initialization, and if not, it enters poll only mode the same way as if configured with `PollOnly`. An intuitive idiosyncrasy of this set up is that even when using the `database/sql` driver bundled here, regardless of whether they're working with Bun, GORM, or whatever, users will generally still be using Pgx under the hood since it's the only maintained and fully functional Postgres driver in the Go ecosystem. With that said, the driver still has to bundle in `lib/pq` for various constructs like `pq.Array` because we're using sqlc, and sqlc's `database/sql` driver always uses `lib/pq`. I tried to find a way around this, but came out fairly convinced that there is none. To rid ourselves of `lib/pq` completely we'd need sqlc to ship an alternative Pgx driver that used Pgx internally, but exposed a `database/sql` interface using `*sql.Tx` instead of `pgx.Tx`.
1 parent 57a6622 commit 4d4c914

23 files changed

Lines changed: 1596 additions & 506 deletions

client.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -493,12 +493,16 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
493493
client.subscriptionManager = newSubscriptionManager(archetype, nil)
494494
client.services = append(client.services, client.completer, client.subscriptionManager)
495495

496-
// In poll only mode, we don't try to initialize a notifier that uses
497-
// listen/notify. Instead, each service polls for changes it's
498-
// interested in. e.g. Elector polls to see if leader has expired.
499-
if !config.PollOnly {
500-
client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
501-
client.services = append(client.services, client.notifier)
496+
if driver.SupportsListener() {
497+
// In poll only mode, we don't try to initialize a notifier that
498+
// uses listen/notify. Instead, each service polls for changes it's
499+
// interested in. e.g. Elector polls to see if leader has expired.
500+
if !config.PollOnly {
501+
client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
502+
client.services = append(client.services, client.notifier)
503+
}
504+
} else {
505+
logger.Info("Driver does not support listener; entering poll only mode")
502506
}
503507

504508
client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
@@ -1155,6 +1159,15 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
11551159
}
11561160
if tags == nil {
11571161
tags = []string{}
1162+
} else {
1163+
for _, tag := range tags {
1164+
if len(tag) > 255 {
1165+
return nil, nil, errors.New("tags should be a maximum of 255 characters long")
1166+
}
1167+
if !tagRE.MatchString(tag) {
1168+
return nil, nil, errors.New("tags should match regex " + tagRE.String())
1169+
}
1170+
}
11581171
}
11591172

11601173
if priority > 4 {
@@ -1176,10 +1189,10 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
11761189

11771190
insertParams := &riverdriver.JobInsertFastParams{
11781191
CreatedAt: createdAt,
1179-
EncodedArgs: encodedArgs,
1192+
EncodedArgs: json.RawMessage(encodedArgs),
11801193
Kind: args.Kind(),
11811194
MaxAttempts: maxAttempts,
1182-
Metadata: metadata,
1195+
Metadata: json.RawMessage(metadata),
11831196
Priority: priority,
11841197
Queue: queue,
11851198
State: rivertype.JobStateAvailable,

client_test.go

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/jackc/pgx/v5"
1818
"github.com/jackc/pgx/v5/pgconn"
1919
"github.com/jackc/pgx/v5/pgxpool"
20+
"github.com/jackc/pgx/v5/stdlib"
2021
"github.com/robfig/cron/v3"
2122
"github.com/stretchr/testify/require"
2223

@@ -31,6 +32,7 @@ import (
3132
"github.com/riverqueue/river/internal/util/ptrutil"
3233
"github.com/riverqueue/river/internal/util/sliceutil"
3334
"github.com/riverqueue/river/riverdriver"
35+
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
3436
"github.com/riverqueue/river/riverdriver/riverpgxv5"
3537
"github.com/riverqueue/river/rivertype"
3638
)
@@ -160,7 +162,7 @@ func newTestClient(t *testing.T, dbPool *pgxpool.Pool, config *Config) *Client[p
160162
return client
161163
}
162164

163-
func startClient(ctx context.Context, t *testing.T, client *Client[pgx.Tx]) {
165+
func startClient[TTx any](ctx context.Context, t *testing.T, client *Client[TTx]) {
164166
t.Helper()
165167

166168
if err := client.Start(ctx); err != nil {
@@ -183,6 +185,21 @@ func runNewTestClient(ctx context.Context, t *testing.T, config *Config) *Client
183185
return client
184186
}
185187

188+
func subscribe[TTx any](t *testing.T, client *Client[TTx]) <-chan *Event {
189+
t.Helper()
190+
191+
subscribeChan, cancel := client.Subscribe(
192+
EventKindJobCancelled,
193+
EventKindJobCompleted,
194+
EventKindJobFailed,
195+
EventKindJobSnoozed,
196+
EventKindQueuePaused,
197+
EventKindQueueResumed,
198+
)
199+
t.Cleanup(cancel)
200+
return subscribeChan
201+
}
202+
186203
func Test_Client(t *testing.T) {
187204
t.Parallel()
188205

@@ -213,21 +230,6 @@ func Test_Client(t *testing.T) {
213230
return newTestClient(t, bundle.dbPool, config), bundle
214231
}
215232

216-
subscribe := func(t *testing.T, client *Client[pgx.Tx]) <-chan *Event {
217-
t.Helper()
218-
219-
subscribeChan, cancel := client.Subscribe(
220-
EventKindJobCancelled,
221-
EventKindJobCompleted,
222-
EventKindJobFailed,
223-
EventKindJobSnoozed,
224-
EventKindQueuePaused,
225-
EventKindQueueResumed,
226-
)
227-
t.Cleanup(cancel)
228-
return subscribeChan
229-
}
230-
231233
t.Run("StartInsertAndWork", func(t *testing.T) {
232234
t.Parallel()
233235

@@ -636,7 +638,40 @@ func Test_Client(t *testing.T) {
636638
}
637639
})
638640

639-
t.Run("PollOnly", func(t *testing.T) {
641+
t.Run("PollOnlyDriver", func(t *testing.T) {
642+
t.Parallel()
643+
644+
config, bundle := setupConfig(t)
645+
bundle.config.PollOnly = true
646+
647+
stdPool := stdlib.OpenDBFromPool(bundle.dbPool)
648+
t.Cleanup(func() { require.NoError(t, stdPool.Close()) })
649+
650+
client, err := NewClient(riverdatabasesql.New(stdPool), config)
651+
require.NoError(t, err)
652+
653+
client.testSignals.Init()
654+
655+
// Notifier should not have been initialized at all.
656+
require.Nil(t, client.notifier)
657+
658+
insertRes, err := client.Insert(ctx, &noOpArgs{}, nil)
659+
require.NoError(t, err)
660+
661+
subscribeChan := subscribe(t, client)
662+
startClient(ctx, t, client)
663+
664+
// Despite no notifier, the client should still be able to elect itself
665+
// leader.
666+
client.testSignals.electedLeader.WaitOrTimeout()
667+
668+
event := riverinternaltest.WaitOrTimeout(t, subscribeChan)
669+
require.Equal(t, EventKindJobCompleted, event.Kind)
670+
require.Equal(t, insertRes.Job.ID, event.Job.ID)
671+
require.Equal(t, rivertype.JobStateCompleted, event.Job.State)
672+
})
673+
674+
t.Run("PollOnlyOption", func(t *testing.T) {
640675
t.Parallel()
641676

642677
config, bundle := setupConfig(t)
@@ -4472,6 +4507,24 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
44724507
require.Equal(t, []string{"tag1", "tag2"}, insertParams.Tags)
44734508
})
44744509

4510+
t.Run("TagFormatValidated", func(t *testing.T) {
4511+
t.Parallel()
4512+
4513+
{
4514+
_, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, &InsertOpts{
4515+
Tags: []string{strings.Repeat("h", 256)},
4516+
})
4517+
require.EqualError(t, err, "tags should be a maximum of 255 characters long")
4518+
}
4519+
4520+
{
4521+
_, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, &InsertOpts{
4522+
Tags: []string{"tag,with,comma"},
4523+
})
4524+
require.EqualError(t, err, "tags should match regex "+tagRE.String())
4525+
}
4526+
})
4527+
44754528
t.Run("UniqueOpts", func(t *testing.T) {
44764529
t.Parallel()
44774530

driver_test.go

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"github.com/riverqueue/river/rivertype"
2121
)
2222

23-
func TestDriverDatabaseSQL_Executor(t *testing.T) {
23+
func TestDriverDatabaseSQL(t *testing.T) {
2424
t.Parallel()
2525

2626
ctx := context.Background()
@@ -29,42 +29,41 @@ func TestDriverDatabaseSQL_Executor(t *testing.T) {
2929
stdPool := stdlib.OpenDBFromPool(dbPool)
3030
t.Cleanup(func() { require.NoError(t, stdPool.Close()) })
3131

32-
driver := riverdatabasesql.New(nil)
33-
riverdrivertest.ExerciseExecutorMigrationOnly(ctx, t, driver, func(ctx context.Context, t *testing.T) *sql.Tx {
34-
t.Helper()
32+
riverdrivertest.Exercise(ctx, t,
33+
func(ctx context.Context, t *testing.T) riverdriver.Driver[*sql.Tx] {
34+
t.Helper()
3535

36-
tx, err := stdPool.BeginTx(ctx, nil)
37-
require.NoError(t, err)
38-
t.Cleanup(func() { _ = tx.Rollback() })
36+
return riverdatabasesql.New(stdPool)
37+
},
38+
func(ctx context.Context, t *testing.T) riverdriver.Executor {
39+
t.Helper()
3940

40-
return tx
41-
})
42-
}
43-
44-
func TestDriverRiverPgxV5_Executor(t *testing.T) {
45-
t.Parallel()
46-
47-
ctx := context.Background()
41+
tx, err := stdPool.BeginTx(ctx, nil)
42+
require.NoError(t, err)
43+
t.Cleanup(func() { _ = tx.Rollback() })
4844

49-
driver := riverpgxv5.New(nil)
50-
riverdrivertest.ExerciseExecutorFull(ctx, t, driver, func(ctx context.Context, t *testing.T) pgx.Tx {
51-
t.Helper()
52-
53-
return riverinternaltest.TestTx(ctx, t)
54-
})
45+
return riverdatabasesql.New(nil).UnwrapExecutor(tx)
46+
})
5547
}
5648

57-
func TestDriverRiverPgxV5_Listener(t *testing.T) {
49+
func TestDriverRiverPgxV5(t *testing.T) {
5850
t.Parallel()
5951

6052
ctx := context.Background()
6153

62-
riverdrivertest.ExerciseListener(ctx, t, func(ctx context.Context, t *testing.T) riverdriver.Driver[pgx.Tx] {
63-
t.Helper()
54+
riverdrivertest.Exercise(ctx, t,
55+
func(ctx context.Context, t *testing.T) riverdriver.Driver[pgx.Tx] {
56+
t.Helper()
6457

65-
dbPool := riverinternaltest.TestDB(ctx, t)
66-
return riverpgxv5.New(dbPool)
67-
})
58+
dbPool := riverinternaltest.TestDB(ctx, t)
59+
return riverpgxv5.New(dbPool)
60+
},
61+
func(ctx context.Context, t *testing.T) riverdriver.Executor {
62+
t.Helper()
63+
64+
tx := riverinternaltest.TestTx(ctx, t)
65+
return riverpgxv5.New(nil).UnwrapExecutor(tx)
66+
})
6867
}
6968

7069
func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {

insert_opts.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,21 @@ package river
33
import (
44
"errors"
55
"fmt"
6+
"regexp"
67
"slices"
78
"time"
89

910
"github.com/riverqueue/river/rivertype"
1011
)
1112

13+
// Regular expression to which the format of tags must comply. Mainly, no
14+
// special characters, and with hyphens in the middle.
15+
//
16+
// A key property here (in case this is relaxed in the future) is that commas
17+
// must never be allowed because they're used as a delimiter during batch job
18+
// insertion for the `riverdatabasesql` driver.
19+
var tagRE = regexp.MustCompile(`\A[\w][\w\-]+[\w]\z`)
20+
1221
// InsertOpts are optional settings for a new job which can be provided at job
1322
// insertion time. These will override any default InsertOpts settings provided
1423
// by JobArgsWithInsertOpts, as well as any global defaults.
@@ -58,6 +67,9 @@ type InsertOpts struct {
5867
// functional behavior and are meant entirely as a user-specified construct
5968
// to help group and categorize jobs.
6069
//
70+
// Tags should conform to the regex `\A[\w][\w\-]+[\w]\z` and be a maximum
71+
// of 255 characters long. No special characters are allowed.
72+
//
6173
// If tags are specified from both a job args override and from options on
6274
// Insert, the latter takes precedence. Tags are not merged.
6375
Tags []string

insert_opts_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,29 @@ import (
99
"github.com/riverqueue/river/rivertype"
1010
)
1111

12+
func TestTagRE(t *testing.T) {
13+
t.Parallel()
14+
15+
require.Regexp(t, tagRE, "aaa")
16+
require.Regexp(t, tagRE, "_aaa")
17+
require.Regexp(t, tagRE, "aaa_")
18+
require.Regexp(t, tagRE, "777")
19+
require.Regexp(t, tagRE, "my-tag")
20+
require.Regexp(t, tagRE, "my_tag")
21+
require.Regexp(t, tagRE, "my-longer-tag")
22+
require.Regexp(t, tagRE, "my_longer_tag")
23+
require.Regexp(t, tagRE, "My_Capitalized_Tag")
24+
require.Regexp(t, tagRE, "ALL_CAPS")
25+
require.Regexp(t, tagRE, "1_2_3")
26+
27+
require.NotRegexp(t, tagRE, "a")
28+
require.NotRegexp(t, tagRE, "aa")
29+
require.NotRegexp(t, tagRE, "-aaa")
30+
require.NotRegexp(t, tagRE, "aaa-")
31+
require.NotRegexp(t, tagRE, "special@characters$banned")
32+
require.NotRegexp(t, tagRE, "commas,never,allowed")
33+
}
34+
1235
func TestJobUniqueOpts_validate(t *testing.T) {
1336
t.Parallel()
1437

0 commit comments

Comments
 (0)