diff --git a/go.mod b/go.mod index 6eed43e3bc..aacd45aae7 100644 --- a/go.mod +++ b/go.mod @@ -56,7 +56,7 @@ require ( github.com/mna/pigeon v1.3.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 github.com/nats-io/nats-server/v2 v2.14.2 - github.com/nats-io/nats.go v1.51.0 + github.com/nats-io/nats.go v1.52.0 github.com/olekukonko/tablewriter v1.1.4 github.com/onsi/ginkgo v1.16.5 github.com/onsi/ginkgo/v2 v2.28.3 diff --git a/go.sum b/go.sum index 81020cf62c..9d62dadf29 100644 --- a/go.sum +++ b/go.sum @@ -901,8 +901,8 @@ github.com/nats-io/jwt/v2 v2.8.2 h1:XXRgB60MSTnqsRwejQurVDs/hcv2dkt+86GjI+I/bMc= github.com/nats-io/jwt/v2 v2.8.2/go.mod h1:Ag/56sq9OblL4JgdYufDd16Egb17Kr/8WwwuO/forVc= github.com/nats-io/nats-server/v2 v2.14.2 h1:Q7dRhCY03Y00rETFW3KV+KGaCIajlDfWgWUVgbMxyuk= github.com/nats-io/nats-server/v2 v2.14.2/go.mod h1:lWpb1bSpRELZfRdlMkdz8E7lbXKKyNe8RIn0vvepIHs= -github.com/nats-io/nats.go v1.51.0 h1:ByW84XTz6W03GSSsygsZcA+xgKK8vPGaa/FCAAEHnAI= -github.com/nats-io/nats.go v1.51.0/go.mod h1:26HypzazeOkyO3/mqd1zZd53STJN0EjCYF9Uy2ZOBno= +github.com/nats-io/nats.go v1.52.0 h1:n3avV4VBsCgsdwh71TppsTwtv+QdPs7ntSKM8qJLGsc= +github.com/nats-io/nats.go v1.52.0/go.mod h1:26HypzazeOkyO3/mqd1zZd53STJN0EjCYF9Uy2ZOBno= github.com/nats-io/nkeys v0.4.16 h1:rd5oAuLOb8mnAycB0xleuEBNS1pVVnN0fv/FF34Eypg= github.com/nats-io/nkeys v0.4.16/go.mod h1:llLgWoI0o4z/Q57q2R1kHfmocyhGV6VG/U18Glg1Afs= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index 37413ecdce..dd47aa422b 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -7,8 +7,8 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io [License-Image]: https://img.shields.io/badge/License-Apache2-blue.svg [ReportCard-Url]: https://goreportcard.com/report/github.com/nats-io/nats.go [ReportCard-Image]: https://goreportcard.com/badge/github.com/nats-io/nats.go -[Build-Status-Url]: https://github.com/nats-io/nats.go/actions -[Build-Status-Image]: https://github.com/nats-io/nats.go/actions/workflows/ci.yaml/badge.svg?branch=main +[Build-Status-Url]: https://github.com/nats-io/nats.go/actions/workflows/ci.yaml?query=event%3Arelease +[Build-Status-Image]: https://github.com/nats-io/nats.go/actions/workflows/ci.yaml/badge.svg?event=release [GoDoc-Url]: https://pkg.go.dev/github.com/nats-io/nats.go [GoDoc-Image]: https://img.shields.io/badge/GoDoc-reference-007d9c [Coverage-Url]: https://coveralls.io/r/nats-io/nats.go?branch=main @@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io go get github.com/nats-io/nats.go@latest # To get a specific version: -go get github.com/nats-io/nats.go@v1.51.0 +go get github.com/nats-io/nats.go@v1.52.0 # Note that the latest major version for NATS Server is v2: go get github.com/nats-io/nats-server/v2@latest diff --git a/vendor/github.com/nats-io/nats.go/go_test.mod b/vendor/github.com/nats-io/nats.go/go_test.mod index 80e6c7daf0..6a1e42660e 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.mod +++ b/vendor/github.com/nats-io/nats.go/go_test.mod @@ -6,17 +6,17 @@ require ( github.com/golang/protobuf v1.5.4 github.com/klauspost/compress v1.18.5 github.com/nats-io/jwt/v2 v2.8.1 - github.com/nats-io/nats-server/v2 v2.12.6 + github.com/nats-io/nats-server/v2 v2.14.0 github.com/nats-io/nkeys v0.4.15 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.33.0 ) require ( - github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op // indirect + github.com/antithesishq/antithesis-sdk-go v0.7.0-default-no-op // indirect github.com/google/go-tpm v0.9.8 // indirect - github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect - golang.org/x/crypto v0.49.0 // indirect - golang.org/x/sys v0.42.0 // indirect + github.com/minio/highwayhash v1.0.4 // indirect + golang.org/x/crypto v0.50.0 // indirect + golang.org/x/sys v0.43.0 // indirect golang.org/x/time v0.15.0 // indirect ) diff --git a/vendor/github.com/nats-io/nats.go/go_test.sum b/vendor/github.com/nats-io/nats.go/go_test.sum index 10127110d4..58effba4bd 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.sum +++ b/vendor/github.com/nats-io/nats.go/go_test.sum @@ -1,47 +1,29 @@ -github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op h1:kpBdlEPbRvff0mDD1gk7o9BhI16b9p5yYAXRlidpqJE= -github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= -github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/antithesishq/antithesis-sdk-go v0.7.0-default-no-op h1:Z/MZK75wC/NSrkgqeNIa7jexam9uWzhLmFTSCPI/kn0= +github.com/antithesishq/antithesis-sdk-go v0.7.0-default-no-op/go.mod h1:FQyySiasQQM8735Ddel3MRojmy4dA1IqCeyJ5jmPMbI= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo= github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= -github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk= -github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= +github.com/minio/highwayhash v1.0.4 h1:asJizugGgchQod2ja9NJlGOWq4s7KsAWr5XUc9Clgl4= +github.com/minio/highwayhash v1.0.4/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt/v2 v2.8.1 h1:V0xpGuD/N8Mi+fQNDynXohVvp7ZztevW5io8CUWlPmU= github.com/nats-io/jwt/v2 v2.8.1/go.mod h1:nWnOEEiVMiKHQpnAy4eXlizVEtSfzacZ1Q43LIRavZg= -github.com/nats-io/nats-server/v2 v2.12.6 h1:Egbx9Vl7Ch8wTtpXPGqbehkZ+IncKqShUxvrt1+Enc8= -github.com/nats-io/nats-server/v2 v2.12.6/go.mod h1:4HPlrvtmSO3yd7KcElDNMx9kv5EBJBnJJzQPptXlheo= +github.com/nats-io/nats-server/v2 v2.14.0 h1:+8q0HrDFotwLLcGH/legOEOnowunhK+aZ4GYBIWpQlM= +github.com/nats-io/nats-server/v2 v2.14.0/go.mod h1:ImVUUDvfClJbb6cuJQRc1VmgDCXKM5ds0OoiG9MVOKo= github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4= github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= -golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= +golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= +golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= -golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= -google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= diff --git a/vendor/github.com/nats-io/nats.go/jetstream/api.go b/vendor/github.com/nats-io/nats.go/jetstream/api.go index e5410b2bbb..99f23924f3 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/api.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/api.go @@ -107,6 +107,9 @@ const ( // apiConsumerUnpinT is the endpoint to unpin a consumer. apiConsumerUnpinT = "CONSUMER.UNPIN.%s.%s" + + // apiConsumerResetT is the endpoint to reset a consumer's delivery state. + apiConsumerResetT = "CONSUMER.RESET.%s.%s" ) func (js *jetStream) apiRequestJSON(ctx context.Context, subject string, resp any, data ...[]byte) (*jetStreamMsg, error) { diff --git a/vendor/github.com/nats-io/nats.go/jetstream/consumer.go b/vendor/github.com/nats-io/nats.go/jetstream/consumer.go index ee0c6f1f22..ec196b62e0 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/consumer.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/consumer.go @@ -496,6 +496,37 @@ func resumeConsumer(ctx context.Context, js *jetStream, stream, consumer string) return pauseConsumer(ctx, js, stream, consumer, nil) } +func resetConsumer(ctx context.Context, js *jetStream, stream, consumer string, seq uint64) (*ConsumerResetResponse, error) { + ctx, cancel := js.wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } + if err := validateConsumerName(consumer); err != nil { + return nil, err + } + subject := fmt.Sprintf(apiConsumerResetT, stream, consumer) + + req, err := json.Marshal(consumerResetRequest{Seq: seq}) + if err != nil { + return nil, err + } + + var resp consumerResetApiResponse + if _, err := js.apiRequestJSON(ctx, subject, &resp, req); err != nil { + return nil, err + } + if resp.Error != nil { + if resp.Error.ErrorCode == JSErrCodeConsumerInvalidReset { + return nil, ErrConsumerInvalidReset + } + return nil, resp.Error + } + if resp.ConsumerInfo == nil { + return nil, ErrConsumerResetResponseEmpty + } + return &resp.ConsumerResetResponse, nil +} + func validateConsumerName(name string) error { if name == "" { return fmt.Errorf("%w: name is required", ErrInvalidConsumerName) diff --git a/vendor/github.com/nats-io/nats.go/jetstream/consumer_config.go b/vendor/github.com/nats-io/nats.go/jetstream/consumer_config.go index b7234b5629..0fbcac9a9e 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/consumer_config.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/consumer_config.go @@ -498,6 +498,13 @@ const ( // AckNonePolicy requires no acks for delivered messages. AckNonePolicy + + // AckFlowControlPolicy functions like AckAllPolicy, but acks based on + // responses to flow control. Used by durable stream sourcing and + // mirroring against an existing durable push consumer. + // + // This feature requires nats-server v2.14.0 or later. + AckFlowControlPolicy ) func (p *AckPolicy) UnmarshalJSON(data []byte) error { @@ -508,6 +515,8 @@ func (p *AckPolicy) UnmarshalJSON(data []byte) error { *p = AckAllPolicy case jsonString("explicit"): *p = AckExplicitPolicy + case jsonString("flow_control"): + *p = AckFlowControlPolicy default: return fmt.Errorf("nats: cannot unmarshal %q", data) } @@ -522,6 +531,8 @@ func (p AckPolicy) MarshalJSON() ([]byte, error) { return json.Marshal("all") case AckExplicitPolicy: return json.Marshal("explicit") + case AckFlowControlPolicy: + return json.Marshal("flow_control") } return nil, fmt.Errorf("nats: unknown acknowledgement policy %v", p) } @@ -534,6 +545,8 @@ func (p AckPolicy) String() string { return "AckAll" case AckExplicitPolicy: return "AckExplicit" + case AckFlowControlPolicy: + return "AckFlowControl" } return "Unknown AckPolicy" } diff --git a/vendor/github.com/nats-io/nats.go/jetstream/errors.go b/vendor/github.com/nats-io/nats.go/jetstream/errors.go index 4cba865ae9..e32d44035f 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/errors.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/errors.go @@ -65,6 +65,17 @@ const ( JSErrCodeConsumerEmptyFilter ErrorCode = 10139 JSErrCodeConsumerExists ErrorCode = 10148 JSErrCodeConsumerDoesNotExist ErrorCode = 10149 + + JSErrCodeMirrorWithMsgSchedules ErrorCode = 10186 + JSErrCodeSourceWithMsgSchedules ErrorCode = 10187 + JSErrCodeMessageSchedulesDisabled ErrorCode = 10188 + JSErrCodeSchedulePatternInvalid ErrorCode = 10189 + JSErrCodeScheduleTargetInvalid ErrorCode = 10190 + JSErrCodeScheduleTTLInvalid ErrorCode = 10191 + JSErrCodeScheduleRollupInvalid ErrorCode = 10192 + JSErrCodeScheduleSourceInvalid ErrorCode = 10203 + + JSErrCodeConsumerInvalidReset ErrorCode = 10204 ) var ( @@ -132,6 +143,17 @@ var ( // with UpdateConsumer but a consumer with given name does not exist. ErrConsumerDoesNotExist JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerDoesNotExist, Description: "consumer does not exist", Code: 400}} + // ErrConsumerResetResponseEmpty is returned when the response from the + // server to a consumer reset request is missing the ConsumerInfo + // payload. The reset may or may not have taken effect. + ErrConsumerResetResponseEmpty JetStreamError = &jsError{message: "consumer reset response is empty"} + + // ErrConsumerInvalidReset is returned when ResetConsumerToSequence is + // called with a sequence that violates the consumer's DeliverPolicy + // constraints (e.g. seq below OptStartSeq, or non-zero seq with a + // DeliverPolicy other than all/by-start-sequence/by-start-time). + ErrConsumerInvalidReset JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerInvalidReset, Description: "invalid reset", Code: 400}} + // ErrMsgNotFound is returned when message with provided sequence number // does not exist. ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}} @@ -158,6 +180,38 @@ var ( // ErrEmptyFilter is returned when a filter in FilterSubjects is empty. ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}} + // ErrScheduleTargetInvalid is returned when publishing a scheduled + // message without a valid target subject. + ErrScheduleTargetInvalid JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeScheduleTargetInvalid, Description: "message schedules target is invalid", Code: 400}} + + // ErrSchedulePatternInvalid is returned when the schedule expression + // is not a valid @at, @every, or cron pattern. + ErrSchedulePatternInvalid JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeSchedulePatternInvalid, Description: "message schedules pattern is invalid", Code: 400}} + + // ErrMessageSchedulesDisabled is returned when publishing a scheduled + // message to a stream that does not have AllowMsgSchedules enabled. + ErrMessageSchedulesDisabled JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageSchedulesDisabled, Description: "message schedules is disabled", Code: 400}} + + // ErrScheduleSourceInvalid is returned when the schedule source + // subject is invalid. + ErrScheduleSourceInvalid JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeScheduleSourceInvalid, Description: "message schedules source is invalid", Code: 400}} + + // ErrScheduleTTLInvalid is returned when the schedule TTL value + // is not a valid duration or "never". + ErrScheduleTTLInvalid JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeScheduleTTLInvalid, Description: "message schedules invalid per-message TTL", Code: 400}} + + // ErrScheduleRollupInvalid is returned when a scheduled message + // has an invalid rollup configuration. + ErrScheduleRollupInvalid JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeScheduleRollupInvalid, Description: "message schedules invalid rollup", Code: 400}} + + // ErrMirrorWithMsgSchedules is returned when attempting to enable + // message scheduling on a mirror stream. + ErrMirrorWithMsgSchedules JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMirrorWithMsgSchedules, Description: "stream mirrors can not also schedule messages", Code: 400}} + + // ErrSourceWithMsgSchedules is returned when attempting to enable + // message scheduling on a stream with sources. + ErrSourceWithMsgSchedules JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeSourceWithMsgSchedules, Description: "stream source can not also schedule messages", Code: 400}} + // Client errors // ErrConsumerMultipleFilterSubjectsNotSupported is returned when the diff --git a/vendor/github.com/nats-io/nats.go/jetstream/jetstream.go b/vendor/github.com/nats-io/nats.go/jetstream/jetstream.go index e2c15ac636..0fb61d8259 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/jetstream.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/jetstream.go @@ -213,6 +213,16 @@ type ( // ResumeConsumer resumes a paused consumer. ResumeConsumer(ctx context.Context, stream string, consumer string) (*ConsumerPauseResponse, error) + // ResetConsumer resets a consumer's delivery state. The consumer is + // reset to deliver from ack_floor + 1. + ResetConsumer(ctx context.Context, stream, consumer string) (*ConsumerResetResponse, error) + + // ResetConsumerToSequence resets a consumer's delivery state to the + // given stream sequence. The seq must be compatible with the + // consumer's DeliverPolicy. If incompatible, ErrConsumerInvalidReset + // is returned. + ResetConsumerToSequence(ctx context.Context, stream, consumer string, seq uint64) (*ConsumerResetResponse, error) + // CreateOrUpdatePushConsumer creates a push consumer on a given stream with // the given config. If consumer already exists, it will be updated (if // possible). Consumer interface is returned, allowing to consume messages. @@ -978,6 +988,20 @@ func (js *jetStream) ResumeConsumer(ctx context.Context, stream string, consumer return resumeConsumer(ctx, js, stream, consumer) } +func (js *jetStream) ResetConsumer(ctx context.Context, stream, consumer string) (*ConsumerResetResponse, error) { + if err := validateStreamName(stream); err != nil { + return nil, err + } + return resetConsumer(ctx, js, stream, consumer, 0) +} + +func (js *jetStream) ResetConsumerToSequence(ctx context.Context, stream, consumer string, seq uint64) (*ConsumerResetResponse, error) { + if err := validateStreamName(stream); err != nil { + return nil, err + } + return resetConsumer(ctx, js, stream, consumer, seq) +} + func validateStreamName(stream string) error { if stream == "" { return ErrStreamNameRequired diff --git a/vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go b/vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go index 54b5ee36a5..10c8f51043 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go @@ -709,6 +709,117 @@ func WithStallWait(ttl time.Duration) PublishOpt { } } +// Predefined schedule constants for use with [WithScheduleCron]. +const ( + ScheduleYearly = "@yearly" + ScheduleMonthly = "@monthly" + ScheduleWeekly = "@weekly" + ScheduleDaily = "@daily" + ScheduleHourly = "@hourly" +) + +// WithScheduleAt schedules a message for one-time delivery at a specific time. +// Requires [StreamConfig.AllowMsgSchedules] to be enabled. +func WithScheduleAt(t time.Time) PublishOpt { + return func(opts *pubOpts) error { + opts.schedule = "@at " + t.UTC().Format(time.RFC3339) + return nil + } +} + +// WithScheduleEvery schedules a message for repeated delivery at the given +// interval. The minimum interval is 1 second. +// Requires [StreamConfig.AllowMsgSchedules] to be enabled. +func WithScheduleEvery(d time.Duration) PublishOpt { + return func(opts *pubOpts) error { + if d < time.Second { + return fmt.Errorf("%w: schedule interval must be at least 1s", ErrInvalidOption) + } + opts.schedule = "@every " + d.String() + return nil + } +} + +// WithScheduleCron schedules a message using a cron expression. +// Accepts 6-field cron format (seconds minutes hours day-of-month month +// day-of-week) or predefined schedules (e.g. [ScheduleHourly], +// [ScheduleDaily]). +// Requires [StreamConfig.AllowMsgSchedules] to be enabled. +func WithScheduleCron(expr string) PublishOpt { + return func(opts *pubOpts) error { + if expr == "" { + return fmt.Errorf("%w: cron expression cannot be empty", ErrInvalidOption) + } + opts.schedule = expr + return nil + } +} + +// WithScheduleTarget sets the subject where scheduled messages will be +// delivered. +// This option is required for scheduled messages. +// Requires [StreamConfig.AllowMsgSchedules] to be enabled. +func WithScheduleTarget(subject string) PublishOpt { + return func(opts *pubOpts) error { + if subject == "" { + return fmt.Errorf("%w: schedule target subject cannot be empty", ErrInvalidOption) + } + opts.scheduleTarget = subject + return nil + } +} + +// WithScheduleSource instructs the scheduler to read the latest message +// from the given subject and republish it on schedule. Used for data +// sampling use cases. +// Requires [StreamConfig.AllowMsgSchedules] to be enabled. +func WithScheduleSource(subject string) PublishOpt { + return func(opts *pubOpts) error { + if subject == "" { + return fmt.Errorf("%w: schedule source subject cannot be empty", ErrInvalidOption) + } + opts.scheduleSource = subject + return nil + } +} + +// WithScheduleTTL sets the TTL on messages generated by the scheduler. +// Requires [StreamConfig.AllowMsgTTL] to be enabled on the stream. +func WithScheduleTTL(d time.Duration) PublishOpt { + return func(opts *pubOpts) error { + if d <= 0 { + return fmt.Errorf("%w: schedule TTL must be greater than 0", ErrInvalidOption) + } + opts.scheduleTTL = d.String() + return nil + } +} + +// WithScheduleTTLNever marks messages generated by the scheduler as never +// expiring. This overrides the stream's MaxAge for delivered messages. +// Requires [StreamConfig.AllowMsgTTL] to be enabled on the stream. +func WithScheduleTTLNever() PublishOpt { + return func(opts *pubOpts) error { + opts.scheduleTTL = "never" + return nil + } +} + +// WithScheduleTimeZone sets the time zone for cron schedule expressions. +// The zone must be a valid IANA time zone name (e.g., "America/New_York"). +// Only valid with cron schedule expressions; the server will reject this +// header if used with @at or @every schedules. +// Requires [StreamConfig.AllowMsgSchedules] to be enabled. +func WithScheduleTimeZone(zone string) PublishOpt { + return func(opts *pubOpts) error { + if zone == "" { + return fmt.Errorf("%w: schedule time zone cannot be empty", ErrInvalidOption) + } + opts.scheduleTZ = zone + return nil + } +} + type nextOptFunc func(*nextOpts) func (fn nextOptFunc) configureNext(opts *nextOpts) { diff --git a/vendor/github.com/nats-io/nats.go/jetstream/message.go b/vendor/github.com/nats-io/nats.go/jetstream/message.go index 357fc298b6..4a3d1ed524 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/message.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/message.go @@ -215,6 +215,45 @@ const ( // MarkerReasonHeader is used to specify a reason for message deletion. MarkerReasonHeader = "Nats-Marker-Reason" + + // ScheduleHeader is used to specify a schedule expression for a message. + // Valid values include "@at ", "@every ", 6-field cron + // expressions, and predefined schedules (@yearly, @monthly, @weekly, + // @daily, @hourly). + // + // This can be set when publishing messages using [WithScheduleAt], + // [WithScheduleEvery], or [WithScheduleCron] options. + // Requires [StreamConfig.AllowMsgSchedules] to be enabled. + ScheduleHeader = "Nats-Schedule" + + // ScheduleTargetHeader specifies the subject where scheduled messages + // will be delivered. + // + // This can be set when publishing messages using [WithScheduleTarget] option. + ScheduleTargetHeader = "Nats-Schedule-Target" + + // ScheduleSourceHeader instructs the scheduler to read the latest message + // from the given subject and republish it on schedule. Used for data + // sampling use cases. + // + // This can be set when publishing messages using [WithScheduleSource] option. + ScheduleSourceHeader = "Nats-Schedule-Source" + + // ScheduleTTLHeader sets a TTL on messages generated by the scheduler. + // The server default is "5m". Use "never" to disable TTL on generated + // messages. + // + // This can be set when publishing messages using [WithScheduleTTL] or + // [WithScheduleTTLNever] options. + // Requires [StreamConfig.AllowMsgTTL] to be enabled on the stream. + ScheduleTTLHeader = "Nats-Schedule-TTL" + + // ScheduleTimeZoneHeader sets the time zone for cron schedule expressions. + // Must be a valid IANA time zone name (e.g., "America/New_York"). + // Only valid with cron schedule expressions. + // + // This can be set when publishing messages using [WithScheduleTimeZone] option. + ScheduleTimeZoneHeader = "Nats-Schedule-Time-Zone" ) // Headers for republished messages and direct gets. Those headers are set by @@ -233,6 +272,14 @@ const ( // SubjectHeader contains the original subject the message was published to. SubjectHeader = "Nats-Subject" + // SchedulerHeader is set by the server on messages generated by a schedule. + // Contains the subject holding the schedule definition. + SchedulerHeader = "Nats-Scheduler" + + // ScheduleNextHeader is set by the server on generated messages. Contains + // the timestamp of the next invocation, or "purge" for one-time schedules. + ScheduleNextHeader = "Nats-Schedule-Next" + // LastSequenceHeader contains the last sequence of the message having the // same subject, otherwise zero if this is the first message for the // subject. diff --git a/vendor/github.com/nats-io/nats.go/jetstream/publish.go b/vendor/github.com/nats-io/nats.go/jetstream/publish.go index 0692505a3d..5194dcc695 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/publish.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/publish.go @@ -50,6 +50,11 @@ type ( lastSubjectSeq *uint64 // Expected last sequence for subject lastSubject string // Expected subject for last sequence ttl time.Duration // Message TTL + schedule string // Schedule expression + scheduleTarget string // Target subject for scheduled messages + scheduleSource string // Source subject for sampling + scheduleTTL string // TTL for generated messages + scheduleTZ string // Time zone for cron schedules // Publish retries for NoResponders err. retryWait time.Duration // Retry wait between attempts @@ -205,6 +210,21 @@ func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...Publis if o.ttl > 0 { m.Header.Set(MsgTTLHeader, o.ttl.String()) } + if o.schedule != "" { + m.Header.Set(ScheduleHeader, o.schedule) + } + if o.scheduleTarget != "" { + m.Header.Set(ScheduleTargetHeader, o.scheduleTarget) + } + if o.scheduleSource != "" { + m.Header.Set(ScheduleSourceHeader, o.scheduleSource) + } + if o.scheduleTTL != "" { + m.Header.Set(ScheduleTTLHeader, o.scheduleTTL) + } + if o.scheduleTZ != "" { + m.Header.Set(ScheduleTimeZoneHeader, o.scheduleTZ) + } var resp *nats.Msg var err error @@ -295,6 +315,21 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut if o.ttl > 0 { m.Header.Set(MsgTTLHeader, o.ttl.String()) } + if o.schedule != "" { + m.Header.Set(ScheduleHeader, o.schedule) + } + if o.scheduleTarget != "" { + m.Header.Set(ScheduleTargetHeader, o.scheduleTarget) + } + if o.scheduleSource != "" { + m.Header.Set(ScheduleSourceHeader, o.scheduleSource) + } + if o.scheduleTTL != "" { + m.Header.Set(ScheduleTTLHeader, o.scheduleTTL) + } + if o.scheduleTZ != "" { + m.Header.Set(ScheduleTimeZoneHeader, o.scheduleTZ) + } paf := o.pafRetry if paf == nil && m.Reply != "" { diff --git a/vendor/github.com/nats-io/nats.go/jetstream/stream.go b/vendor/github.com/nats-io/nats.go/jetstream/stream.go index 5cad22e5bf..280fdf9a92 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/stream.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/stream.go @@ -121,6 +121,18 @@ type ( // If consumer does not exist, ErrConsumerNotFound is returned. UnpinConsumer(ctx context.Context, consumer string, group string) error + // ResetConsumer resets a consumer's delivery state without deleting + // and recreating it. The consumer is reset to deliver from + // ack_floor + 1. + ResetConsumer(ctx context.Context, consumer string) (*ConsumerResetResponse, error) + + // ResetConsumerToSequence resets a consumer's delivery state to the + // given stream sequence. The seq must be compatible with the + // consumer's DeliverPolicy (e.g. for DeliverByStartSequencePolicy, + // seq must be >= OptStartSeq). If incompatible, + // ErrConsumerInvalidReset is returned. + ResetConsumerToSequence(ctx context.Context, consumer string, seq uint64) (*ConsumerResetResponse, error) + // CreateOrUpdatePushConsumer creates a push consumer on a given stream with // given config. If consumer already exists, it will be updated (if // possible). Consumer interface is returned, allowing to consume messages. @@ -218,6 +230,29 @@ type ( ConsumerPauseResponse } + consumerResetRequest struct { + Seq uint64 `json:"seq,omitempty"` + } + + // ConsumerResetResponse contains the result of a consumer reset operation. + // It carries the updated ConsumerInfo together with the stream sequence the + // server actually reset the consumer to. + // + // The embedded *ConsumerInfo is guaranteed non-nil on a successful response; + // the helper returns ErrConsumerResetResponseEmpty if the server omits it. + ConsumerResetResponse struct { + *ConsumerInfo + // ResetSeq is the stream sequence the consumer was reset to. When + // ResetConsumer is called without an explicit sequence, this is the + // server-resolved value (typically ack_floor + 1). + ResetSeq uint64 `json:"reset_seq"` + } + + consumerResetApiResponse struct { + apiResponse + ConsumerResetResponse + } + // GetMsgOpt is a function setting options for [Stream.GetMsg] GetMsgOpt func(*apiMsgGetRequest) error @@ -825,3 +860,14 @@ func (s *consumerLister) consumerNames(ctx context.Context, stream string) ([]st func (s *stream) UnpinConsumer(ctx context.Context, consumer string, group string) error { return unpinConsumer(ctx, s.js, s.name, consumer, group) } + +// ResetConsumer resets a consumer's delivery state. See [Stream.ResetConsumer]. +func (s *stream) ResetConsumer(ctx context.Context, consumer string) (*ConsumerResetResponse, error) { + return resetConsumer(ctx, s.js, s.name, consumer, 0) +} + +// ResetConsumerToSequence resets a consumer's delivery state to the given +// stream sequence. See [Stream.ResetConsumerToSequence]. +func (s *stream) ResetConsumerToSequence(ctx context.Context, consumer string, seq uint64) (*ConsumerResetResponse, error) { + return resetConsumer(ctx, s.js, s.name, consumer, seq) +} diff --git a/vendor/github.com/nats-io/nats.go/jetstream/stream_config.go b/vendor/github.com/nats-io/nats.go/jetstream/stream_config.go index efcb9522a4..f107f2949e 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/stream_config.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/stream_config.go @@ -1,4 +1,4 @@ -// Copyright 2022-2024 The NATS Authors +// Copyright 2022-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -213,6 +213,10 @@ type ( // PersistMode allows to opt-in to different persistence mode settings. PersistMode PersistModeType `json:"persist_mode,omitempty"` + + // AllowBatchPublish allows fast batch publishing into the stream. + // This feature requires nats-server v2.14.0 or later. + AllowBatchPublish bool `json:"allow_batched,omitempty"` } // StreamSourceInfo shows information about an upstream stream @@ -393,12 +397,28 @@ type ( // account or JetStream domain. External *ExternalStream `json:"external,omitempty"` + // Consumer is consumer information for durable sourcing. + // + // This feature requires nats-server v2.14.0 or later. + Consumer *StreamConsumerSource `json:"consumer,omitempty"` + // Domain is used to configure a stream source in another JetStream // domain. This setting will set the External field with the appropriate // APIPrefix. Domain string `json:"-"` } + // StreamConsumerSource is consumer information for durable sourcing. + // + // This feature requires nats-server v2.14.0 or later. + StreamConsumerSource struct { + // Name is the consumer name. + Name string `json:"name,omitempty"` + + // DeliverSubject is the subject to deliver messages to. + DeliverSubject string `json:"deliver_subject,omitempty"` + } + // ExternalStream allows you to qualify access to a stream source in another // account. ExternalStream struct { diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 495f9edb01..d2d338d463 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -49,7 +49,7 @@ import ( // Default Constants const ( - Version = "1.51.0" + Version = "1.52.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -5100,6 +5100,15 @@ func (s *Subscription) StatusChanged(statuses ...SubStatus) <-chan SubStatus { ch := make(chan SubStatus, 10) s.mu.Lock() defer s.mu.Unlock() + + if s.status == SubscriptionClosed { + if slices.Contains(statuses, SubscriptionClosed) { + ch <- SubscriptionClosed + } + close(ch) + return ch + } + for _, status := range statuses { s.registerStatusChangeListener(status, ch) // initial status diff --git a/vendor/modules.txt b/vendor/modules.txt index b15cb1e871..80116fcc9e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1168,7 +1168,7 @@ github.com/nats-io/nats-server/v2/server/stree github.com/nats-io/nats-server/v2/server/sysmem github.com/nats-io/nats-server/v2/server/thw github.com/nats-io/nats-server/v2/server/tpm -# github.com/nats-io/nats.go v1.51.0 +# github.com/nats-io/nats.go v1.52.0 ## explicit; go 1.25.0 github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin