-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathengine_table.go
More file actions
103 lines (84 loc) · 2.67 KB
/
engine_table.go
File metadata and controls
103 lines (84 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/*
* Copyright (c) 2019 uplus.io
*/
package uengine
import (
"github.com/uplus-io/uengine/model"
log "github.com/uplus-io/ugo/logger"
)
const (
ENGINE_NAMESPACE_SYSTEM = "_sys" // 系统命名空间
ENGINE_NAMESPACE_USER = "_user" // 用户命名空间
ENGINE_TABLE_NAMESPACES = "_ns" //系统命名空间表名
ENGINE_TABLE_TABLES = "_tab" //系统表名
ENGINE_TABLE_PARTITIONS = "_part" //系统分区表名
ENGINE_TABLE_CLUSTERS = "_cluster" //集群信息存储表
ENGINE_TABLE_METAS = "_meta" //系统元数据表名
ENGINE_KEY_META_STORAGE = "storage" //存储元数据主键
ENGINE_KEY_META_PART = "part" //分区元数据主键
ENGINE_KEY_META_REPOSITORY = "repo" //分区元数据主键
)
var (
EMPTY_KEY = []byte{}
)
type EngineTable struct {
engine *Engine
parts map[int32]*model.Partition
}
func NewEngineTable(engine *Engine) *EngineTable {
table := &EngineTable{engine: engine, parts: make(map[int32]*model.Partition)}
table.recoverPartition()
return table
}
func NewIdOfNs(key []byte) *Identity {
return NewIdentity(ENGINE_NAMESPACE_SYSTEM, ENGINE_TABLE_NAMESPACES, key)
}
func NewIdOfPart(key []byte) *Identity {
return NewIdentity(ENGINE_NAMESPACE_SYSTEM, ENGINE_TABLE_PARTITIONS, key)
}
func NewIdOfTab(namespace string, key []byte) *Identity {
return NewIdentity(namespace, ENGINE_TABLE_TABLES, key)
}
func NewIdOfData(namespace, table string, key []byte) *Identity {
return NewIdentity(namespace, table, key)
}
func (p *EngineTable) recoverPartition() {
partSize := len(p.engine.config.Partitions)
for i := 0; i < partSize; i++ {
operator := p.engine.partitions[i]
partition := operator.Part()
if partition != nil {
p.parts[partition.Id] = partition
}
}
}
func (p *EngineTable) Repository() *model.Repository {
repo := &model.Repository{}
err := p.engine.meta.SysGet(ENGINE_TABLE_CLUSTERS, ENGINE_KEY_META_REPOSITORY, repo)
if err != nil && err != ErrDbKeyNotFound {
log.Errorf("get cluster repository meta")
return nil
}
return repo
}
func (p *EngineTable) RepositoryUpdate(repository model.Repository) error {
return p.engine.meta.SysSet(ENGINE_TABLE_CLUSTERS, ENGINE_KEY_META_REPOSITORY, &repository)
}
func (p *EngineTable) PartitionOfIndex(partIndex int32) *model.Partition {
if int(partIndex) >= len(p.parts) {
partIndex = 0
}
return p.engine.part(partIndex).Part()
}
func (p *EngineTable) Partition(partId int32) *model.Partition {
partition := p.parts[partId]
return partition
}
func (p *EngineTable) AddPartition(part model.Partition) error {
newPart, err := p.engine.part(part.Index).PartIfAbsent(part)
if err != nil {
return err
}
p.parts[newPart.Id] = newPart
return nil
}