-
Notifications
You must be signed in to change notification settings - Fork 154
Expand file tree
/
Copy pathinsert_events.sql.eex
More file actions
140 lines (136 loc) · 4.22 KB
/
insert_events.sql.eex
File metadata and controls
140 lines (136 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
<% #
# Elixir template variables:
# schema - string
# stream_id - integer
# number_of_events - integer
# Bind variables
# 1 - stream_id - integer
# 2 - stream_version - integer
# Additionally, we multiply the index by 9 for each event:
# 3 - event_id - uuid
# 4 - event_type - text
# 5 - causation_id - uuid
# 6 - correlation_id - uuid
# 7 - data - <serialized event data format (bytea, jsonb, etc.)>
# 8 - metadata - jsonb (always?)
# 9 - created_at - timestamp
# 10 - index - integer
# 11 - stream_version - integer
%>
WITH
<% #
# create a table variable with:
# event_id - uuid - the id for the new event
# index - integer - the increase in the stream version for any stream it is linked to
# stream_version - integer - the final stream version after all of the events have been inserted
%>
new_events_indexes (event_id, index, stream_version) AS (
VALUES
<%= for i <- 0..(number_of_events - 1) do %>
<%= unless i == 0 do %>,<% end %>
($<%= i*9+3 %>::uuid, $<%= i*9+10 %>::int, $<%= i*9+11 %>::bigint)
<% end %>
),
events AS (
<%
# insert the new events into the events table
# using the 7 bind variables from 3 to 9 inclusive
# n.b.: the bind for the event_id is re-generated here
%>
INSERT INTO "<%= schema %>".events
(
event_id,
event_type,
causation_id,
correlation_id,
data,
metadata,
created_at
)
VALUES
<%= for i <- 0..(number_of_events - 1) do %>
<%= unless i == 0 do %>,<% end %>
($<%= i*9+3 %>, $<%= i*9+4 %>, $<%= i*9+5 %>, $<%= i*9+6 %>, $<%= i*9+7 %>, $<%= i*9+8 %>, $<%= i*9+9 %>)
<% end %>
),
stream AS (
<% # Increase the version to the stream version given %>
<%= cond do %>
<% stream_id -> %>
UPDATE "<%= schema %>".streams
SET stream_version = stream_version + $2::bigint
WHERE stream_id = $1::bigint
returning stream_id
<% created_at -> %>
<%
# the event created_at date has been provided as the last bind variable
# use that instead of generating one
%>
INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version, created_at)
VALUES ($1, $2::bigint, $<%= number_of_events*9 + 3 %>)
returning stream_id
<% true -> %>
INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version)
VALUES ($1, $2::bigint)
returning stream_id
<% end %>
),
source_stream_events AS (
<%
# link the new events into it's source stream
# we're using the passed in event_ids rather than reading/joining from tables
# each insert uses the stream_version calculated for the corresponding event
# we're joining here, so we'll get the product of:
# the stream (1)
# the rows in the table variable (number_of_events)
%>
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
stream.stream_id,
new_events_indexes.stream_version,
stream.stream_id,
new_events_indexes.stream_version
FROM new_events_indexes, stream
),
linked_stream AS (
<%
# Update the all streams version by the number of events
# This is the value of the expected version at append time + the number of events
# Returns the version before the update
%>
UPDATE "<%= schema %>".streams
SET stream_version = stream_version + $2::bigint
WHERE stream_id = 0
RETURNING stream_version - $2::bigint as initial_stream_version
),
linked_stream_events AS (
<%
# Link the new events into the $all stream
# 1 row for each event
%>
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
new_events_indexes.stream_version
FROM
new_events_indexes, linked_stream, stream
)
SELECT stream_id from stream;