-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy path3. IOPV_realtime_mult.dos
More file actions
136 lines (111 loc) · 5.54 KB
/
3. IOPV_realtime_mult.dos
File metadata and controls
136 lines (111 loc) · 5.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*
* 程序名: 多只ETF实时 IOPV增量计算
* 作者: DolphinDB
* 时间: 2022-06-15
* 程序说明: 本程序为 DolphinDB 单只 ETF 基金 IOPV 实时计算
* 基金份额参考净值(原始公式)= {∑(pi*sharesi) * 10 + AI + ECC} /10000
* 基金份额参考净值(本案例公式)= {∑(pi*sharesi) /1000}
*/
//clean up environment
def cleanEnvironment(){
try { unsubscribeTable(tableName="tradeOriginalStream", actionName="tradeProcess") } catch(ex) { print(ex) }
try { unsubscribeTable(tableName="tradeProcessStream", actionName="IOPVResult") } catch(ex) { print(ex) }
try { dropStreamEngine("tradeProcessPriceChange") } catch(ex) { print(ex) }
try { dropStreamEngine("tradeProcessIOPVChange") } catch(ex) { print(ex) }
try { dropStreamEngine("IOPVResult") } catch(ex) { print(ex) }
try{ dropStreamTable(`tradeOriginalStream) } catch(ex){ print(ex) }
try{ dropStreamTable(`IOPVResult) } catch(ex){ print(ex) }
try{ undef(`staticInof, SHARED) } catch(ex){ print(ex) }
undef all
}
cleanEnvironment()
def getBasketData(allSymbol, n){
return loop(x->table(take(x, 50) as BasketID, rand(allSymbol, 50) as SecurityID, rand(76339..145256, 50) as Vol), 1..n).unionAll(false)
}
def createStreamTableFunc(){
login("admin","123456")
trade = loadTable("dfs://LEVEL2_SZ","Trade")
allSyms = select count(*) from trade where date(tradetime) = 2020.01.02 group by SecurityID
basket = getBasketData(allSyms.SecurityID, 100)
share(basket, `staticInof)
//create stream table: tradeOriginalStream
colName = `SecurityID`tradetime`Price
colType = [SYMBOL, TIMESTAMP, DOUBLE]
tradeOriginalStreamTemp = streamTable(20000000:0, colName, colType)
try{ enableTableShareAndPersistence(table=tradeOriginalStreamTemp, tableName="tradeOriginalStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) }
catch(ex){ print(ex) }
undef("tradeOriginalStreamTemp")
//create stream table: IOPVResult
colName = `BasketID`tradetime`IOPV
colType = [INT, TIMESTAMP, DOUBLE]
IOPVResultTemp = streamTable(20000000:0, colName, colType)
try{ enableTableShareAndPersistence(table=IOPVResultTemp, tableName="IOPVResult", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) }
catch(ex){ print(ex) }
undef("IOPVResultTemp")
}
createStreamTableFunc()
go
def tradeProcess(){
metricsFuc = [
<tradetime>,
<Price>]
createReactiveStateEngine(name="tradeProcessPriceChange", metrics=metricsFuc, dummyTable=tradeOriginalStream, outputTable=tradeOriginalStream, keyColumn=`SecurityID, filter=<deltas(Price) != 0>, keepOrder=true)
colName = `SecurityID`tradetime`Price`BasketID`Vol
colType = [SYMBOL, TIMESTAMP, DOUBLE, INT, INT]
tradeProcessDummy = table(1:0, colName, colType)
metricsProcess = [
<tradetime>,
<deltas(Price*Vol/1000)>]
createReactiveStateEngine(name="tradeProcessIOPVChange", metrics=metricsProcess, dummyTable=tradeProcessDummy, outputTable=getStreamEngine(`IOPVResult), keyColumn=`BasketID`SecurityID, keepOrder=true)
}
def tradeResult(){
colName =`BasketID `SecurityID`tradetime`deltaValue
colType = [INT, SYMBOL, TIMESTAMP, DOUBLE]
tradeResultDummy = table(1:0, colName, colType)
metricsResult = [
<tradetime>,
<cumsum(deltaValue)>]
createReactiveStateEngine(name="IOPVResult", metrics=metricsResult, dummyTable=tradeResultDummy, outputTable=IOPVResult, keyColumn=`BasketID, keepOrder=true)
}
tradeResult()
go
tradeProcess()
go
def tradeProcess(msg){
temp = select * from lj(msg, staticInof, `SecurityID) where BasketID != NULL
getStreamEngine(`tradeProcessIOPVChange).append!(temp)
}
subscribeTable(tableName="tradeOriginalStream", actionName="tradeProcess", offset=-1, handler=tradeProcess, msgAsTable=true, hash=0)
priceData = select last(tradetime) as tradetime, last(Price) as Price from loadTable("dfs://LEVEL2_SZ", "Trade") where tradetime>=2020.01.02T09:15:00.000, tradetime<2020.01.02T09:30:00.000 group by SecurityID order by SecurityID
warmData1 = lj(priceData, staticInof, `SecurityID)
warmupStreamEngine(getStreamEngine(`tradeProcessIOPVChange), warmData1)
warmData2 = select last(tradetime) as tradetime, sum(Price*Vol) as IOPV from lj(priceData, staticInof, `SecurityID) where BasketID != NULL group by BasketID, SecurityID order by BasketID
warmupStreamEngine(getStreamEngine(`IOPVResult), warmData2)
t = select SecurityID, tradetime, Price from loadTable("dfs://LEVEL2_SZ", "Trade") where date(tradetime)=2020.01.02, tradetime>=2020.01.02T09:30:00.000 order by tradetime, SecurityID
submitJob("replay_trade", "trade", replay{t, getStreamEngine(`tradeProcessPriceChange), `tradetime, `tradetime, 10000, true, 1})
select * from getRecentJobs() where endTime.isNull()
getStreamingStat().subWorkers
/**
//运行监控观察
streamStateQuery.txt
Script to query stream compute state
DolphinDB Inc.
DolphinDB server version: 1.30.18 2022.05.09/2.00.6 2022.05.09
Last modification time: 2022.05.31
//query register subscription information
getStreamingStat().pubTables
//query publish queue
getStreamingStat().pubConns
//query subscription consumption information
getStreamingStat().subWorkers
//query register stream computing engine
getStreamEngineStat()
//query register stream computing engine, the type is ReactiveStreamEngine
getStreamEngineStat().ReactiveStreamEngine
//query register stream computing engine, the type is DailyTimeSeriesEngine
getStreamEngineStat().DailyTimeSeriesEngine
for(i in 1..100){
print(select BasketID, IOPV from IOPVResult where BasketID in 1 2 3 context by BasketID order by BasketID limit -1 )
sleep(500)
}
*/