Skip to content
This repository was archived by the owner on Jun 18, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions esdb/append_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (

// AppendToStreamOptions options of the append stream request.
type AppendToStreamOptions struct {
// Asks the server to check that the stream receiving the event is at the given expected version.
ExpectedRevision ExpectedRevision
// Asks the server to check that the stream receiving the event is at the expected state.
StreamState StreamState
// Asks for authenticated request.
Authenticated *Credentials
// A length of time to use for gRPC deadlines.
Expand All @@ -33,7 +33,7 @@ func (o *AppendToStreamOptions) requiresLeader() bool {
}

func (o *AppendToStreamOptions) setDefaults() {
if o.ExpectedRevision == nil {
o.ExpectedRevision = Any{}
if o.StreamState == nil {
o.StreamState = Any{}
}
}
8 changes: 4 additions & 4 deletions esdb/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func appendToStreamSingleEventNoStream(db *esdb.Client) TestCall {
defer cancel()

opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}

_, err := db.AppendToStream(context, streamID.String(), opts, testEvent)
Expand Down Expand Up @@ -103,7 +103,7 @@ func appendWithInvalidStreamRevision(db *esdb.Client) TestCall {
defer cancel()

opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.StreamExists{},
StreamState: esdb.StreamExists{},
}

_, err := db.AppendToStream(context, streamID.String(), opts, createTestEvent())
Expand Down Expand Up @@ -139,7 +139,7 @@ func appendToSystemStreamWithIncorrectCredentials(container *Container) TestCall
defer cancel()

opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.Any{},
StreamState: esdb.Any{},
}

_, err = db.AppendToStream(context, streamID.String(), opts, createTestEvent())
Expand All @@ -156,7 +156,7 @@ func metadataOperation(db *esdb.Client) TestCall {
defer cancel()

opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.Any{},
StreamState: esdb.Any{},
}

_, err := db.AppendToStream(context, streamID.String(), opts, createTestEvent())
Expand Down
6 changes: 3 additions & 3 deletions esdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (client *Client) AppendToStream(
return nil, fmt.Errorf("could not construct append operation. Reason: %w", err)
}

header := toAppendHeader(streamID, opts.ExpectedRevision)
header := toAppendHeader(streamID, opts.StreamState)

if err := appendOperation.Send(header); err != nil {
err = client.grpcClient.handleError(handle, trailers, err)
Expand Down Expand Up @@ -248,7 +248,7 @@ func (client *Client) DeleteStream(
callOptions := []grpc.CallOption{grpc.Header(&headers), grpc.Trailer(&trailers)}
callOptions, ctx, cancel := configureGrpcCall(parent, client.config, &opts, callOptions, client.grpcClient.perRPCCredentials)
defer cancel()
deleteRequest := toDeleteRequest(streamID, opts.ExpectedRevision)
deleteRequest := toDeleteRequest(streamID, opts.StreamState)
deleteResponse, err := streamsClient.Delete(ctx, deleteRequest, callOptions...)
if err != nil {
err = client.grpcClient.handleError(handle, trailers, err)
Expand Down Expand Up @@ -278,7 +278,7 @@ func (client *Client) TombstoneStream(
callOptions := []grpc.CallOption{grpc.Header(&headers), grpc.Trailer(&trailers)}
callOptions, ctx, cancel := configureGrpcCall(parent, client.config, &opts, callOptions, client.grpcClient.perRPCCredentials)
defer cancel()
tombstoneRequest := toTombstoneRequest(streamID, opts.ExpectedRevision)
tombstoneRequest := toTombstoneRequest(streamID, opts.StreamState)
tombstoneResponse, err := streamsClient.Tombstone(ctx, tombstoneRequest, callOptions...)

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion esdb/client_certificates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func testInvalidUserCertificates(t *testing.T, endpoint string) {

streamID := uuid.NewString()
opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.Any{},
StreamState: esdb.Any{},
}

result, err := c.AppendToStream(context.Background(), streamID, opts, testEvent)
Expand Down
4 changes: 2 additions & 2 deletions esdb/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func closeConnection(container *Container) TestCall {
context, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second)
defer cancel()
opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}
_, err := db.AppendToStream(context, streamID.String(), opts, testEvent)

Expand All @@ -36,7 +36,7 @@ func closeConnection(container *Container) TestCall {
}

db.Close()
opts.ExpectedRevision = esdb.Any{}
opts.StreamState = esdb.Any{}
_, err = db.AppendToStream(context, streamID.String(), opts, testEvent)

esdbErr, ok := esdb.FromError(err)
Expand Down
8 changes: 4 additions & 4 deletions esdb/delete_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (

// DeleteStreamOptions options of the delete stream request.
type DeleteStreamOptions struct {
// Asks the server to check that the stream receiving the event is at the given expected version.
ExpectedRevision ExpectedRevision
// Asks the server to check that the stream receiving the event is at the expected state.
StreamState StreamState
// Asks for authenticated request.
Authenticated *Credentials
// A length of time to use for gRPC deadlines.
Expand All @@ -33,7 +33,7 @@ func (o *DeleteStreamOptions) requiresLeader() bool {
}

func (o *DeleteStreamOptions) setDefaults() {
if o.ExpectedRevision == nil {
o.ExpectedRevision = Any{}
if o.StreamState == nil {
o.StreamState = Any{}
}
}
4 changes: 2 additions & 2 deletions esdb/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func DeleteTests(t *testing.T, db *esdb.Client) {
func canDeleteStream(db *esdb.Client) TestCall {
return func(t *testing.T) {
opts := esdb.DeleteStreamOptions{
ExpectedRevision: esdb.Revision(0),
StreamState: esdb.Revision(0),
}

streamID := NAME_GENERATOR.Generate()
Expand All @@ -43,7 +43,7 @@ func canTombstoneStream(db *esdb.Client) TestCall {

_, err := db.AppendToStream(context.Background(), streamId, esdb.AppendToStreamOptions{}, createTestEvent())
deleteResult, err := db.TombstoneStream(context.Background(), streamId, esdb.TombstoneStreamOptions{
ExpectedRevision: esdb.Revision(0),
StreamState: esdb.Revision(0),
})

if err != nil {
Expand Down
24 changes: 12 additions & 12 deletions esdb/persistent_subscription_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func persistentSubscription_ToExistingStream_StartFromBeginning_AndEventsInIt(cl
streamID := NAME_GENERATOR.Generate()
// append events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events);
opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}

_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events...)
Expand Down Expand Up @@ -131,7 +131,7 @@ func persistentSubscription_ToNonExistingStream_StartFromBeginning_AppendEventsA
require.NoError(t, err)
// append events to StreamsClient.AppendToStreamAsync(Stream, stream_revision.StreamRevisionNoStream, Events);
opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}
_, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events...)
require.NoError(t, err)
Expand Down Expand Up @@ -160,7 +160,7 @@ func persistentSubscription_ToExistingStream_StartFromEnd_EventsInItAndAppendEve
streamID := NAME_GENERATOR.Generate()
// append events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events);
opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}
_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:10]...)
require.NoError(t, err)
Expand All @@ -177,7 +177,7 @@ func persistentSubscription_ToExistingStream_StartFromEnd_EventsInItAndAppendEve
require.NoError(t, err)

// append 1 event to StreamsClient.AppendToStreamAsync(Stream, new StreamRevision(9), event[10])
opts.ExpectedRevision = esdb.Revision(9)
opts.StreamState = esdb.Revision(9)
_, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events[10:]...)
require.NoError(t, err)

Expand Down Expand Up @@ -205,7 +205,7 @@ func persistentSubscription_ToExistingStream_StartFromEnd_EventsInIt(clientInsta
streamID := NAME_GENERATOR.Generate()
// append events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events);
opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}

_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:10]...)
Expand Down Expand Up @@ -274,7 +274,7 @@ func persistentSubscription_ToNonExistingStream_StartFromTwo_AppendEventsAfterwa
require.NoError(t, err)
// append 3 event to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, events)
opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}
_, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events...)
require.NoError(t, err)
Expand Down Expand Up @@ -302,7 +302,7 @@ func persistentSubscription_ToExistingStream_StartFrom10_EventsInItAppendEventsA

// append 10 events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, events[:10]);
opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}
streamID := NAME_GENERATOR.Generate()
_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:10]...)
Expand All @@ -322,7 +322,7 @@ func persistentSubscription_ToExistingStream_StartFrom10_EventsInItAppendEventsA

// append 1 event to StreamsClient.AppendToStreamAsync(Stream, StreamRevision(9), events[10:)
opts = esdb.AppendToStreamOptions{
ExpectedRevision: esdb.Revision(9),
StreamState: esdb.Revision(9),
}
_, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events[10:]...)
require.NoError(t, err)
Expand Down Expand Up @@ -351,7 +351,7 @@ func persistentSubscription_ToExistingStream_StartFrom4_EventsInIt(clientInstanc

// append 10 events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, events[:10]);
opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}
streamID := NAME_GENERATOR.Generate()
_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:10]...)
Expand All @@ -372,7 +372,7 @@ func persistentSubscription_ToExistingStream_StartFrom4_EventsInIt(clientInstanc

// append 1 event to StreamsClient.AppendToStreamAsync(Stream, StreamRevision(9), events)
opts = esdb.AppendToStreamOptions{
ExpectedRevision: esdb.Revision(9),
StreamState: esdb.Revision(9),
}
_, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events[10:]...)
require.NoError(t, err)
Expand Down Expand Up @@ -402,7 +402,7 @@ func persistentSubscription_ToExistingStream_StartFromHigherRevisionThenEventsIn
// append 10 events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, events[:10]);
streamID := NAME_GENERATOR.Generate()
opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}
_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:11]...)
require.NoError(t, err)
Expand All @@ -421,7 +421,7 @@ func persistentSubscription_ToExistingStream_StartFromHigherRevisionThenEventsIn

// append event to StreamsClient.AppendToStreamAsync(Stream, StreamRevision(10), events[11:])
opts = esdb.AppendToStreamOptions{
ExpectedRevision: esdb.Revision(10),
StreamState: esdb.Revision(10),
}

_, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events[11])
Expand Down
2 changes: 1 addition & 1 deletion esdb/persistent_subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func pushEventsToStream(t *testing.T,
events []esdb.EventData) {

opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}
_, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events...)

Expand Down
6 changes: 3 additions & 3 deletions esdb/protobuf_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const systemMetadataKeysContentType = "content-type"
const systemMetadataKeysCreated = "created"

// toAppendHeader ...
func toAppendHeader(streamID string, streamRevision ExpectedRevision) *api.AppendReq {
func toAppendHeader(streamID string, streamRevision StreamState) *api.AppendReq {
appendReq := &api.AppendReq{
Content: &api.AppendReq_Options_{
Options: &api.AppendReq_Options{},
Expand Down Expand Up @@ -206,7 +206,7 @@ func toFilterOptions(options *SubscriptionFilterOptions) (*api.ReadReq_Options_F
return &filterOptions, nil
}

func toDeleteRequest(streamID string, streamRevision ExpectedRevision) *api.DeleteReq {
func toDeleteRequest(streamID string, streamRevision StreamState) *api.DeleteReq {
deleteReq := &api.DeleteReq{
Options: &api.DeleteReq_Options{
StreamIdentifier: &shared.StreamIdentifier{
Expand Down Expand Up @@ -237,7 +237,7 @@ func toDeleteRequest(streamID string, streamRevision ExpectedRevision) *api.Dele
return deleteReq
}

func toTombstoneRequest(streamID string, streamRevision ExpectedRevision) *api.TombstoneReq {
func toTombstoneRequest(streamID string, streamRevision StreamState) *api.TombstoneReq {
tombstoneReq := &api.TombstoneReq{
Options: &api.TombstoneReq_Options{
StreamIdentifier: &shared.StreamIdentifier{
Expand Down
4 changes: 2 additions & 2 deletions esdb/read_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"errors"
"github.com/google/uuid"
"io"
"os"
"math"
"os"
"testing"
"time"

Expand Down Expand Up @@ -199,7 +199,7 @@ func readStreamReturnsEOFAfterCompletion(db *esdb.Client) TestCall {
}

opts := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}

streamID := NAME_GENERATOR.Generate()
Expand Down
6 changes: 3 additions & 3 deletions esdb/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ type StreamExists struct{}
// NoStream means the stream being written to should not yet exist.
type NoStream struct{}

// ExpectedRevision the use of expected revision can be a bit tricky especially when discussing guaranties given by
// StreamState the use of expected revision can be a bit tricky especially when discussing guaranties given by
// EventStoreDB server. The EventStoreDB server will assure idempotency for all requests using any value in
// ExpectedRevision except Any. When using Any, the EventStoreDB server will do its best to assure idempotency but
// StreamState except Any. When using Any, the EventStoreDB server will do its best to assure idempotency but
// will not guarantee it.
type ExpectedRevision interface {
type StreamState interface {
isExpectedRevision()
}

Expand Down
2 changes: 1 addition & 1 deletion esdb/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func streamSubscriptionDeliversAllEventsInStreamAndListensForNewEvents(db *esdb.

// Write a new event
opts2 := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.Revision(5_999),
StreamState: esdb.Revision(5_999),
}
writeResult, err := db.AppendToStream(context.Background(), streamID, opts2, testEvent)
require.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions esdb/tombstone_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import "time"

// TombstoneStreamOptions options of the tombstone stream request.
type TombstoneStreamOptions struct {
// Asks the server to check that the stream receiving the event is at the given expected version.
ExpectedRevision ExpectedRevision
// Asks the server to check that the stream receiving the event is at the expected state.
StreamState StreamState
// Asks for authenticated request.
Authenticated *Credentials
// A length of time to use for gRPC deadlines.
Expand All @@ -31,7 +31,7 @@ func (o *TombstoneStreamOptions) requiresLeader() bool {
}

func (o *TombstoneStreamOptions) setDefaults() {
if o.ExpectedRevision == nil {
o.ExpectedRevision = Any{}
if o.StreamState == nil {
o.StreamState = Any{}
}
}
6 changes: 3 additions & 3 deletions samples/appendingEvents.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func AppendToStream(db *esdb.Client) {
}

options := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}

result, err := db.AppendToStream(context.Background(), "some-stream", options, esdb.EventData{
Expand Down Expand Up @@ -89,7 +89,7 @@ func AppendWithNoStream(db *esdb.Client) {
}

options := esdb.AppendToStreamOptions{
ExpectedRevision: esdb.NoStream{},
StreamState: esdb.NoStream{},
}

_, err = db.AppendToStream(context.Background(), "same-event-stream", options, esdb.EventData{
Expand Down Expand Up @@ -151,7 +151,7 @@ func AppendWithConcurrencyCheck(db *esdb.Client) {
}

aopts := esdb.AppendToStreamOptions{
ExpectedRevision: lastEvent.OriginalStreamRevision(),
StreamState: lastEvent.OriginalStreamRevision(),
}

_, err = db.AppendToStream(context.Background(), "concurrency-stream", aopts, esdb.EventData{
Expand Down