-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathstartup.dos
More file actions
119 lines (119 loc) · 6.27 KB
/
startup.dos
File metadata and controls
119 lines (119 loc) · 6.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
//加载插件
try{loadPlugin("/DolphinDB/server/plugins/nsq/PluginNsq.txt")} catch(ex){print(ex)}
go
//连接行情服务器
nsq::connect("/DolphinDB/server/plugins/nsq/nsq_sdk_config.ini")
go
//获取行情数据的表结构
ordersSchema = nsq::getSchema(`orders)
tradeSchema = nsq::getSchema(`trade)
snapshotSchema = nsq::getSchema(`snapshot)
go
//登陆账号
login("admin", "123456")
//Level2行情数据存储数据库
dbName = "dfs://stockL2"
if(existsDatabase(dbName) == false){
dbDate = database("", VALUE, 2022.01.01..2022.12.01)
dbSymbol = database("", HASH, [SYMBOL, 20])
database(dbName, COMPO, [dbDate, dbSymbol], , "TSDB")
print("Database created successfully !")
}
else{
print("Database and table have been created !")
}
//上交所逐笔委托
tbName = "orders_sh"
if(existsTable(dbName, tbName) == false){
db = database(dbName)
db.createPartitionedTable(table=table(1:0, ordersSchema.name, ordersSchema.type), tableName=tbName, partitionColumns=`TradeDate`InstrumentID, sortColumns=`InstrumentID`TransactTime, keepDuplicates=ALL)
print("DFS table created successfully !")
}
else{
print("DFS table have been created !")
}
//深交所逐笔委托
tbName = "orders_sz"
if(existsTable(dbName, tbName) == false){
db = database(dbName)
db.createPartitionedTable(table=table(1:0, ordersSchema.name, ordersSchema.type), tableName=tbName, partitionColumns=`TradeDate`InstrumentID, sortColumns=`InstrumentID`TransactTime, keepDuplicates=ALL)
print("DFS table created successfully !")
}
else{
print("DFS table have been created !")
}
//上交所逐笔成交
tbName = "trade_sh"
if(existsTable(dbName, tbName) == false){
db = database(dbName)
db.createPartitionedTable(table=table(1:0, tradeSchema.name, tradeSchema.type), tableName=tbName, partitionColumns=`Tradedate`InstrumentID, sortColumns=`InstrumentID`TransactTime, keepDuplicates=ALL)
print("DFS table created successfully !")
}
else{
print("DFS table have been created !")
}
//深交所逐笔成交
tbName = "trade_sz"
if(existsTable(dbName, tbName) == false){
db = database(dbName)
db.createPartitionedTable(table=table(1:0, tradeSchema.name, tradeSchema.type), tableName=tbName, partitionColumns=`Tradedate`InstrumentID, sortColumns=`InstrumentID`TransactTime, keepDuplicates=ALL)
print("DFS table created successfully !")
}
else{
print("DFS table have been created !")
}
//上交所L2快照
tbName = "snapshot_sh"
if(existsTable(dbName, tbName) == false){
db = database(dbName)
db.createPartitionedTable(table(1:0, snapshotSchema.name, snapshotSchema.type), tbName, `TradeDate`InstrumentID, sortColumns=`InstrumentID`UpdateTime, keepDuplicates=ALL)
print("DFS table created successfully !")
}
else{
print("DFS table have been created !")
}
//深交所L2快照
tbName = "snapshot_sz"
if(existsTable(dbName, tbName) == false){
db = database(dbName)
db.createPartitionedTable(table(1:0, snapshotSchema.name, snapshotSchema.type), tbName, `TradeDate`InstrumentID, sortColumns=`InstrumentID`UpdateTime, keepDuplicates=ALL)
print("DFS table created successfully !")
}
else{
print("DFS table have been created !")
}
go
//创建共享的异步持久化流数据表:上交所逐笔委托
enableTableShareAndPersistence(table=streamTable(1:0, ordersSchema.name, ordersSchema.type), tableName=`orders_sh_stream, cacheSize=1000000, preCache=1000)
//创建共享的异步持久化流数据表:深交所逐笔委托
enableTableShareAndPersistence(table=streamTable(1:0, ordersSchema.name, ordersSchema.type), tableName=`orders_sz_stream, cacheSize=1000000, preCache=1000)
//创建共享的异步持久化流数据表:上交所逐笔成交
enableTableShareAndPersistence(table=streamTable(1:0, tradeSchema.name, tradeSchema.type), tableName=`trade_sh_stream, cacheSize=1000000, preCache=1000)
//创建共享的异步持久化流数据表:深交所逐笔成交
enableTableShareAndPersistence(table=streamTable(1:0, tradeSchema.name, tradeSchema.type), tableName=`trade_sz_stream, cacheSize=1000000, preCache=1000)
//创建共享的异步持久化流数据表:上交所L2快照
enableTableShareAndPersistence(table=streamTable(1:0, snapshotSchema.name, snapshotSchema.type), tableName=`snapshot_sh_stream, cacheSize=1000000, preCache=1000)
//创建共享的异步持久化流数据表:深交所L2快照
enableTableShareAndPersistence(table=streamTable(1:0, snapshotSchema.name, snapshotSchema.type), tableName=`snapshot_sz_stream, cacheSize=1000000, preCache=1000)
go
//创建实时增量数据落库的订阅:上交所逐笔委托
subscribeTable(tableName="orders_sh_stream", actionName="saveOrdersShToDFS", offset=-1, handler=loadTable("dfs://stockL2", "orders_sh"), msgAsTable=true, batchSize=10000, throttle=1, reconnect = true)
//创建实时增量数据落库的订阅:深交所逐笔委托
subscribeTable(tableName="orders_sz_stream", actionName="saveOrdersSzToDFS", offset=-1, handler=loadTable("dfs://stockL2", "orders_sz"), msgAsTable=true, batchSize=10000, throttle=1, reconnect = true)
//创建实时增量数据落库的订阅:上交所逐笔成交
subscribeTable(tableName="trade_sh_stream", actionName="saveTradeShToDFS", offset=-1, handler=loadTable("dfs://stockL2", "trade_sh"), msgAsTable=true, batchSize=10000, throttle=1, reconnect = true)
//创建实时增量数据落库的订阅:深交所逐笔成交
subscribeTable(tableName="trade_sz_stream", actionName="saveTradeSzToDFS", offset=-1, handler=loadTable("dfs://stockL2", "trade_sz"), msgAsTable=true, batchSize=10000, throttle=1, reconnect = true)
//创建实时增量数据落库的订阅:上交所L2快照
subscribeTable(tableName="snapshot_sh_stream", actionName="saveSnapshotShToDFS", offset=-1, handler=loadTable("dfs://stockL2", "snapshot_sh"), msgAsTable=true, batchSize=10000, throttle=1, reconnect = true)
//创建实时增量数据落库的订阅:深交所L2快照
subscribeTable(tableName="snapshot_sz_stream", actionName="saveSnapshotSzToDFS", offset=-1, handler=loadTable("dfs://stockL2", "snapshot_sz"), msgAsTable=true, batchSize=10000, throttle=1, reconnect = true)
go
//NSQ 插件接入实时行情
nsq::subscribe(`orders, `sh, orders_sh_stream)
nsq::subscribe(`orders, `sz, orders_sz_stream)
nsq::subscribe(`trade, `sh, trade_sh_stream)
nsq::subscribe(`trade, `sz, trade_sz_stream)
nsq::subscribe(`snapshot, `sh, snapshot_sh_stream)
nsq::subscribe(`snapshot, `sz, snapshot_sz_stream)
writeLog("NSQ subscribe Successfully !")