-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathformat.go
More file actions
343 lines (287 loc) · 9.71 KB
/
format.go
File metadata and controls
343 lines (287 loc) · 9.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
package query
import (
"fmt"
"strconv"
"strings"
"github.com/specterops/dawgs/drivers/pg/model"
"github.com/specterops/dawgs/graph"
)
func postgresIndexType(indexType graph.IndexType) string {
switch indexType {
case graph.BTreeIndex:
return pgIndexTypeBTree
case graph.TextSearchIndex:
return pgIndexTypeGIN
default:
return "NOT SUPPORTED"
}
}
func parsePostgresIndexType(pgType string) graph.IndexType {
switch strings.ToLower(pgType) {
case pgIndexTypeBTree:
return graph.BTreeIndex
case pgIndexTypeGIN:
return graph.TextSearchIndex
default:
return graph.UnsupportedIndex
}
}
func join(values ...string) string {
return strings.Join(values, "")
}
func formatDropPropertyIndex(indexName string) string {
return join("drop index if exists ", indexName, ";")
}
func formatDropPropertyConstraint(constraintName string) string {
return join("drop index if exists ", constraintName, ";")
}
func formatCreatePropertyConstraint(constraintName, tableName, fieldName string, indexType graph.IndexType) string {
pgIndexType := postgresIndexType(indexType)
return join("create unique index ", constraintName, " on ", tableName, " using ",
pgIndexType, " ((", tableName, ".", pgPropertiesColumn, " ->> '", fieldName, "'));")
}
func formatCreatePropertyIndex(indexName, tableName, fieldName string, indexType graph.IndexType) string {
var (
pgIndexType = postgresIndexType(indexType)
queryPartial = join("create index ", indexName, " on ", tableName, " using ",
pgIndexType, " ((", tableName, ".", pgPropertiesColumn, " ->> '", fieldName)
)
if indexType == graph.TextSearchIndex {
// GIN text search requires the column to be typed and to contain the tri-gram operation extension
return join(queryPartial, "'::text) gin_trgm_ops);")
} else {
return join(queryPartial, "'));")
}
}
func formatCreatePartitionTable(name, parent string, graphID int32) string {
builder := strings.Builder{}
builder.WriteString("create table ")
builder.WriteString(name)
builder.WriteString(" partition of ")
builder.WriteString(parent)
builder.WriteString(" for values in (")
builder.WriteString(strconv.FormatInt(int64(graphID), 10))
builder.WriteString(")")
return builder.String()
}
func formatConflictMatcher(propertyNames []string, defaultOnConflict string) string {
builder := strings.Builder{}
builder.WriteString("on conflict (")
if len(propertyNames) > 0 {
for idx, propertyName := range propertyNames {
if idx > 0 {
builder.WriteString(", ")
}
builder.WriteString("(properties->>'")
builder.WriteString(propertyName)
builder.WriteString("')")
}
} else {
builder.WriteString(defaultOnConflict)
}
builder.WriteString(") ")
return builder.String()
}
func FormatNodesUpdate(graphTarget model.Graph) string {
return join(
"update ", graphTarget.Partitions.Node.Name, " as n ",
"set ",
" kind_ids = uniq(sort(kind_ids - u.deleted_kinds || u.added_kinds)), ",
" properties = n.properties - u.deleted_properties || u.properties ",
"from ",
" (select ",
" unnest($1::text[])::int8 as id, ",
" unnest($2::text[])::int2[] as added_kinds, ",
" unnest($3::text[])::int2[] as deleted_kinds, ",
" unnest($4::jsonb[]) as properties, ",
" unnest($5::text[])::text[] as deleted_properties) as u ",
"where n.id = u.id; ",
)
}
// NodeUpdateStagingTable is the name of the temporary staging table used by largeUpdate.
const NodeUpdateStagingTable = "node_update_staging"
// NodeUpdateStagingColumns lists the columns (in order) written by a COPY FROM during largeUpdate.
var NodeUpdateStagingColumns = []string{"id", "added_kinds", "deleted_kinds", "properties", "deleted_props"}
func FormatCreateNodeUpdateStagingTable(stagingTable string) string {
return join(
"create temp table if not exists ", stagingTable, " (",
"id bigint, ",
"added_kinds text, ",
"deleted_kinds text, ",
"properties text, ",
"deleted_props text",
") on commit drop;",
)
}
func FormatMergeNodeLargeUpdate(graphTarget model.Graph, stagingTable string) string {
return join(
"merge into ", graphTarget.Partitions.Node.Name, " as n ",
"using ", stagingTable, " as u on n.id = u.id ",
"when matched then update set ",
"kind_ids = uniq(sort(n.kind_ids - u.deleted_kinds::int2[] || u.added_kinds::int2[])), ",
"properties = n.properties - u.deleted_props::text[] || u.properties::jsonb;",
)
}
func FormatNodeUpsert(graphTarget model.Graph, identityProperties []string) string {
return join(
"insert into ", graphTarget.Partitions.Node.Name, " as n ",
"(graph_id, kind_ids, properties) ",
"select $1, unnest($2::text[])::int2[], unnest($3::jsonb[]) ",
formatConflictMatcher(identityProperties, "id, graph_id"),
"do update set properties = n.properties || excluded.properties, kind_ids = uniq(sort(n.kind_ids || excluded.kind_ids)) ",
"returning id;",
)
}
func FormatRelationshipPartitionUpsert(graphTarget model.Graph, identityProperties []string) string {
return join("insert into ", graphTarget.Partitions.Edge.Name, " as e ",
"(graph_id, start_id, end_id, kind_id, properties) ",
"select $1, unnest($2::int8[]), unnest($3::int8[]), unnest($4::int2[]), unnest($5::jsonb[]) ",
formatConflictMatcher(identityProperties, "start_id, end_id, kind_id, graph_id"),
"do update set properties = e.properties || excluded.properties;",
)
}
type NodeUpdate struct {
IDFuture *Future[graph.ID]
Node *graph.Node
}
// NodeUpdateBatch
//
// TODO: See note below
//
// Some assumptions were made here regarding identity kind matching since this data model does not directly require the
// kind of a node to enforce a constraint
type NodeUpdateBatch struct {
IdentityProperties []string
Updates map[string]*NodeUpdate
}
func NewNodeUpdateBatch() *NodeUpdateBatch {
return &NodeUpdateBatch{
Updates: map[string]*NodeUpdate{},
}
}
func (s *NodeUpdateBatch) Add(update graph.NodeUpdate) (*Future[graph.ID], error) {
if len(s.IdentityProperties) > 0 && len(update.IdentityProperties) != len(s.IdentityProperties) {
return nil, fmt.Errorf("node update mixes identity properties with pre-existing updates")
}
for _, expectedIdentityProperty := range s.IdentityProperties {
found := false
for _, updateIdentityProperty := range update.IdentityProperties {
if expectedIdentityProperty == updateIdentityProperty {
found = true
break
}
}
if !found {
return nil, fmt.Errorf("node update mixes identity properties with pre-existing updates")
}
}
if key, err := update.Key(); err != nil {
return nil, err
} else {
update.Node.AddKinds(update.Node.Kinds...)
if len(s.IdentityProperties) == 0 {
s.IdentityProperties = make([]string, len(update.IdentityProperties))
copy(s.IdentityProperties, update.IdentityProperties)
}
if existingUpdate, hasExisting := s.Updates[key]; hasExisting {
existingUpdate.Node.Merge(update.Node)
return existingUpdate.IDFuture, nil
} else {
newIDFuture := NewFuture(graph.ID(0))
s.Updates[key] = &NodeUpdate{
IDFuture: newIDFuture,
Node: update.Node,
}
return newIDFuture, nil
}
}
}
func ValidateNodeUpdateByBatch(updates []graph.NodeUpdate) (*NodeUpdateBatch, error) {
updateBatch := NewNodeUpdateBatch()
for _, update := range updates {
if _, err := updateBatch.Add(update); err != nil {
return nil, err
}
}
return updateBatch, nil
}
type Future[T any] struct {
Value T
}
func NewFuture[T any](value T) *Future[T] {
return &Future[T]{
Value: value,
}
}
type RelationshipUpdate struct {
StartID *Future[graph.ID]
EndID *Future[graph.ID]
Relationship *graph.Relationship
}
type RelationshipUpdateBatch struct {
NodeUpdates *NodeUpdateBatch
IdentityProperties []string
Updates map[string]*RelationshipUpdate
}
func NewRelationshipUpdateBatch() *RelationshipUpdateBatch {
return &RelationshipUpdateBatch{
NodeUpdates: NewNodeUpdateBatch(),
Updates: map[string]*RelationshipUpdate{},
}
}
func (s *RelationshipUpdateBatch) Add(update graph.RelationshipUpdate) error {
if len(s.IdentityProperties) > 0 && len(update.IdentityProperties) != len(s.IdentityProperties) {
return fmt.Errorf("relationship update mixes identity properties with pre-existing updates")
}
for _, expectedIdentityProperty := range s.IdentityProperties {
found := false
for _, updateIdentityProperty := range update.IdentityProperties {
if expectedIdentityProperty == updateIdentityProperty {
found = true
break
}
}
if !found {
return fmt.Errorf("relationship update mixes identity properties with pre-existing updates")
}
}
if startNodeID, err := s.NodeUpdates.Add(graph.NodeUpdate{
Node: update.Start,
IdentityKind: update.StartIdentityKind,
IdentityProperties: update.StartIdentityProperties,
}); err != nil {
return err
} else if endNodeID, err := s.NodeUpdates.Add(graph.NodeUpdate{
Node: update.End,
IdentityKind: update.EndIdentityKind,
IdentityProperties: update.EndIdentityProperties,
}); err != nil {
return err
} else if key, err := update.Key(); err != nil {
return err
} else {
if len(s.IdentityProperties) == 0 {
s.IdentityProperties = make([]string, len(update.IdentityProperties))
copy(s.IdentityProperties, update.IdentityProperties)
}
if existingUpdate, hasExisting := s.Updates[key]; hasExisting {
existingUpdate.Relationship.Merge(update.Relationship)
} else {
s.Updates[key] = &RelationshipUpdate{
StartID: startNodeID,
EndID: endNodeID,
Relationship: update.Relationship,
}
}
}
return nil
}
func ValidateRelationshipUpdateByBatch(updates []graph.RelationshipUpdate) (*RelationshipUpdateBatch, error) {
updateBatch := NewRelationshipUpdateBatch()
for _, update := range updates {
if err := updateBatch.Add(update); err != nil {
return nil, err
}
}
return updateBatch, nil
}