Skip to content

Commit 0f161eb

Browse files
update Handle function for presetIDStrategy
1 parent 5ad03d5 commit 0f161eb

15 files changed

Lines changed: 1487 additions & 102 deletions

go.mod

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
module github.com/viant/sqlx
22

3-
go 1.21.6
4-
5-
toolchain go1.21.12
3+
go 1.22.0
64

75
require (
86
github.com/aerospike/aerospike-client-go v4.5.2+incompatible
@@ -36,13 +34,10 @@ require (
3634
github.com/aerospike/aerospike-client-go/v6 v6.15.1 // indirect
3735
github.com/andybalholm/brotli v1.0.4 // indirect
3836
github.com/davecgh/go-spew v1.1.1 // indirect
39-
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.0-20210816181553-5444fa50b93d // indirect
4037
github.com/felixge/httpsnoop v1.0.4 // indirect
4138
github.com/go-errors/errors v1.5.1 // indirect
4239
github.com/go-logr/logr v1.4.1 // indirect
4340
github.com/go-logr/stdr v1.2.2 // indirect
44-
github.com/goccy/go-json v0.9.11 // indirect
45-
github.com/golang-jwt/jwt/v4 v4.4.1 // indirect
4641
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect
4742
github.com/golang-sql/sqlexp v0.1.0 // indirect
4843
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
@@ -51,19 +46,12 @@ require (
5146
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
5247
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
5348
github.com/klauspost/compress v1.15.9 // indirect
54-
github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect
55-
github.com/lestrrat-go/blackmagic v1.0.0 // indirect
56-
github.com/lestrrat-go/httpcc v1.0.1 // indirect
57-
github.com/lestrrat-go/iter v1.0.1 // indirect
58-
github.com/lestrrat-go/jwx v1.2.25 // indirect
59-
github.com/lestrrat-go/option v1.0.0 // indirect
6049
github.com/mattn/go-runewidth v0.0.9 // indirect
6150
github.com/olekukonko/tablewriter v0.0.5 // indirect
6251
github.com/onsi/ginkgo v1.16.5 // indirect
6352
github.com/pierrec/lz4/v4 v4.1.15 // indirect
6453
github.com/pmezard/go-difflib v1.0.0 // indirect
6554
github.com/segmentio/encoding v0.3.5 // indirect
66-
github.com/viant/scy v0.3.2-0.20220825213848-acc5c59cde78 // indirect
6755
github.com/viant/sqlparser v0.7.4 // indirect
6856
github.com/viant/x v0.3.0 // indirect
6957
github.com/yuin/gopher-lua v1.1.1 // indirect

io/insert/record_numeric.go

Lines changed: 28 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@ package insert
33
import (
44
"context"
55
"fmt"
6+
"reflect"
7+
"sync"
8+
69
"github.com/viant/sqlx"
710
"github.com/viant/sqlx/io"
811
"github.com/viant/sqlx/metadata"
912
"github.com/viant/sqlx/metadata/info"
1013
"github.com/viant/sqlx/metadata/info/dialect"
1114
"github.com/viant/sqlx/metadata/sink"
1215
"github.com/viant/sqlx/option"
13-
"reflect"
14-
"sync"
1516
)
1617

1718
type numericSequencer struct {
@@ -68,7 +69,7 @@ func (n *numericSequencer) nextSequence(ctx context.Context, sess *session, reco
6869
return nil, nil
6970
case dialect.PresetIDStrategyUndefined:
7071
n.shallPresetIdentities = false
71-
n.updateSequence(ctx, n.getSequenceName(sess), recordCount)
72+
n.updateSequencer(ctx, n.getSequenceName(sess), recordCount)
7273
return nil, nil
7374
case dialect.PresetIDWithMax:
7475
options = append(options, n.maxIDSQLBuilder(sess))
@@ -89,32 +90,26 @@ func (n *numericSequencer) nextSequence(ctx context.Context, sess *session, reco
8990
return n.sequence, nil
9091
}
9192

92-
func (n *numericSequencer) transientDMLBuilder(sess *session, record interface{}, batchRecordBuffer []interface{}, recordCount int64) func(*sink.Sequence) (*sqlx.SQL, error) {
93-
return func(sequence *sink.Sequence) (*sqlx.SQL, error) {
93+
func (n *numericSequencer) transientDMLBuilder(sess *session, record interface{}, batchRecordBuffer []interface{}, recordCount int64) func(*sink.Sequence) (*sqlx.SQL, int64, error) {
94+
return func(sequence *sink.Sequence) (*sqlx.SQL, int64, error) {
9495
resetAutoincrementQuery := sess.Builder.Build(record, option.BatchSize(1))
9596
resetAutoincrementQuery = sess.Dialect.EnsurePlaceholders(resetAutoincrementQuery)
9697
sess.binder(record, batchRecordBuffer, 0, len(sess.columns))
9798

9899
values := make([]interface{}, len(sess.columns))
99100
copy(values, batchRecordBuffer[0:len(sess.columns)-1]) // don't copy ID pointer (last position in slice)
100101

101-
oldValue := sequence.Value
102-
var passedValue int64
103-
104-
switch recordCount {
105-
default:
106-
sequence.Value = sequence.NextValue(recordCount)
107-
if diff := sequence.Value - oldValue; diff < recordCount {
108-
return nil, fmt.Errorf("new next value for sequenceName %d is too small, expected >= %d but had ", sequence.Value, oldValue+recordCount)
109-
}
110-
passedValue = sequence.Value - sequence.IncrementBy // decreasing is required for transient insert approach
102+
maxIdValue, err := sequence.ComputeNextForTransient(recordCount)
103+
if err != nil {
104+
return nil, 0, err
111105
}
112-
values[len(sess.columns)-1] = &passedValue
106+
107+
values[len(sess.columns)-1] = &maxIdValue
113108
resetAutoincrementSQL := &sqlx.SQL{
114109
Query: resetAutoincrementQuery,
115110
Args: values,
116111
}
117-
return resetAutoincrementSQL, nil
112+
return resetAutoincrementSQL, recordCount, nil
118113
}
119114
}
120115

@@ -149,50 +144,41 @@ func (n *numericSequencer) getColumn() io.Column {
149144
}
150145

151146
func (n *numericSequencer) prepareSequenceIfNeeded(ctx context.Context, sess *session, record interface{}, columnValue *interface{}, recordCount int, identitiesBatched []interface{}, options []option.Option) error {
152-
// presetting sequence only once reserves (if implemented) sequence values on db only one time
153-
if n.detectedPreset {
154-
return nil
155-
}
156-
157-
if recordCount == 0 {
147+
// presetting sequence only once reserves (if implemented) sequence values on db only one time per batch operation
148+
if recordCount <= 0 {
158149
return nil
159150
}
160151

161152
n.muxPreset.Lock()
153+
defer n.muxPreset.Unlock()
154+
162155
if n.detectedPreset {
163-
n.muxPreset.Unlock()
164156
return nil
165157
}
166158

159+
if columnValue == nil {
160+
return fmt.Errorf("columnValue is nil")
161+
}
162+
167163
isColumnZeroValue := isZero(*columnValue)
168164
n.shallPresetIdentities = isColumnZeroValue
169165

170166
if isColumnZeroValue {
171-
var err error
172-
n.sequence, err = n.nextSequence(ctx, sess, record, identitiesBatched, recordCount, options)
167+
seq, err := n.nextSequence(ctx, sess, record, identitiesBatched, recordCount, options)
173168
if err != nil {
174-
n.muxPreset.Unlock()
175169
return err
176170
}
177-
//} else { // n.sequence should be nil (it's not in use), and it's important in afterFlush func
178-
// n.updateSequence(ctx, n.getSequenceName(sess), recordCount)
171+
n.sequence = seq
179172
}
180173

181174
if n.sequence != nil && n.shallPresetIdentities && n.sequenceValue == nil {
182-
var seqValue int64
183-
184-
switch recordCount {
185-
case 1: // TODO not proved that miss some edge cases, if does then only default case should be used
186-
seqValue = n.sequence.Value - n.sequence.IncrementBy
187-
default:
188-
seqValue = n.sequence.MinValue(int64(recordCount))
189-
}
190-
175+
seqValue := n.sequence.MinValue(int64(recordCount))
191176
n.sequenceValue = &seqValue
192177
}
193178

194-
n.detectedPreset = true // detectPreset must be here to avoid sending 0 in preset mode to db
195-
n.muxPreset.Unlock()
179+
// detectPreset must be set at the end to avoid sending 0 in preset mode to db
180+
n.detectedPreset = true
181+
196182
return nil
197183
}
198184

@@ -237,7 +223,7 @@ func (n *numericSequencer) afterFlush(ctx context.Context, values []interface{},
237223
}
238224
//in case there is a batch insert, we need to check if last inserted ID is the same as the sequence value
239225
//if so we can safely update the identities with the new sequence value within the batch
240-
n.updateSequence(ctx, n.sequence.Name, int(rowsAffected))
226+
n.updateSequencer(ctx, n.sequence.Name, int(rowsAffected))
241227
sequenceValue = n.sequence.Value
242228
expectedNextInsertID := (1 + rowsAffected) * inceremntBy
243229
if expectedNextInsertID != sequenceValue { //race condition during batch insert, skip updating IDs
@@ -272,7 +258,7 @@ func isZero(value interface{}) bool {
272258
}
273259
}
274260

275-
func (n *numericSequencer) updateSequence(ctx context.Context, sequenceName string, recordCount int) {
261+
func (n *numericSequencer) updateSequencer(ctx context.Context, sequenceName string, recordCount int) {
276262
meta := metadata.New()
277263
options := []option.Option{option.NewArgs(n.session.info.Catalog, n.session.info.Schema, sequenceName), n.session.Dialect}
278264

io/insert/service.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func New(ctx context.Context, db *sql.DB, tableName string, options ...option.Op
4848
return inserter, nil
4949
}
5050

51-
// NextSequence resets next updateSequence
51+
// NextSequence resets next updateSequencer
5252
func (s *Service) NextSequence(ctx context.Context, any interface{}, recordCount int, options ...option.Option) (*sink.Sequence, error) {
5353
valueAt, count, err := io.Values(any)
5454
if err != nil {
@@ -153,6 +153,22 @@ func (s *Service) NewSession(ctx context.Context, record interface{}, db *sql.DB
153153
defer s.mux.Unlock()
154154
rType := reflect.TypeOf(record)
155155
if sess := s.cachedSession; sess != nil && sess.rType == rType && sess.batchSize == batchSize {
156+
// Reset per-call state on cached numeric sequencers before returning
157+
// to avoid carrying over detected preset state between sessions.
158+
for _, updater := range s.cachedSession.recordUpdaters {
159+
if asNumeric, ok := updater.(*numericSequencer); ok {
160+
// reset preset/sequence flags under locks
161+
asNumeric.muxPreset.Lock()
162+
asNumeric.muxSequenceValue.Lock()
163+
asNumeric.detectedPreset = false
164+
asNumeric.sequence = nil
165+
asNumeric.shallPresetIdentities = true
166+
asNumeric.sequenceValue = nil
167+
asNumeric.muxSequenceValue.Unlock()
168+
asNumeric.muxPreset.Unlock()
169+
}
170+
}
171+
156172
if db == nil {
157173
db = sess.db
158174
}

metadata/product/mysql/sequence/handler.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7-
"github.com/viant/sqlx/metadata/sink"
8-
"github.com/viant/sqlx/option"
97
"os"
108
"strconv"
119
"strings"
10+
11+
"github.com/viant/sqlx/metadata/sink"
12+
"github.com/viant/sqlx/option"
1213
)
1314

1415
const autoincrementAssignment = "AUTO_INCREMENT="
@@ -53,7 +54,7 @@ func updateSequence(ctx context.Context, db *sql.DB, sequence *sink.Sequence, tx
5354
seqValueFragment := DDL[index+len(autoincrementAssignment):]
5455
debugSequencer := os.Getenv("DEBUG_SEQUENCER") == "true"
5556
if debugSequencer {
56-
fmt.Printf("sequencer: %v, value: %v\n", sequence.Name, seqValueFragment)
57+
fmt.Printf("u sequencer: %v, value: %v\n", sequence.Name, seqValueFragment)
5758
}
5859
if index := strings.Index(seqValueFragment, " "); index != -1 {
5960
seqValueFragment = seqValueFragment[:index]
@@ -102,10 +103,14 @@ func ensureIncrementsAndValues(ctx context.Context, db *sql.DB, sequence *sink.S
102103
sequence.MaxValue = MaxSeqValue
103104
}
104105

105-
debugSequencer := os.Getenv("DEBUG_SEQUENCER") == "true"
106-
if debugSequencer {
107-
fmt.Printf("sequencer: %v, start: %v, value: %v, max: %v, increment: %v\n", sequence.Name, sequence.StartValue, sequence.Value, sequence.MaxValue, sequence.IncrementBy)
106+
if sequence.DataType == "" {
107+
sequence.DataType = "int"
108108
}
109+
110+
//debugSequencer := os.Getenv("DEBUG_SEQUENCER") == "true"
111+
//if debugSequencer {
112+
// fmt.Printf("e sequencer_ptr: %p sequencer: %v, start: %v, value: %v, max: %v, increment: %v\n", &sequence, sequence.Name, sequence.StartValue, sequence.Value, sequence.MaxValue, sequence.IncrementBy)
113+
//}
109114
return nil
110115
}
111116

0 commit comments

Comments
 (0)