-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy path02.createEngineSub.txt
More file actions
95 lines (89 loc) · 3.89 KB
/
02.createEngineSub.txt
File metadata and controls
95 lines (89 loc) · 3.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
createEngineSub.txt
Script to register stream computing engine and subscribe the stream tables
DolphinDB Inc.
DolphinDB server version: 1.30.18 2022.05.09/2.00.6 2022.05.09
Last modification time: 2022.05.31
*/
/*
* Label small, medium and large order
* small : 0
* medium : 1
* large : 2
*/
@state
def tagFunc(qty){
return iif(qty <= 20000, 0, iif(qty <= 200000 and qty > 20000, 1, 2))
}
def processBuyOrderFunc(parallel){
metricsBuy = [
<TradeTime>,
<SellNum>,
<TradeAmount>,
<TradeQty>,
<cumsum(TradeAmount)>,
<tagFunc(cumsum(TradeQty))>,
<prev(cumsum(TradeAmount))>,
<prev(tagFunc(cumsum(TradeQty)))>]
for(i in 1..parallel){
createReactiveStateEngine(name = "processBuyOrder"+string(i), metrics = metricsBuy, dummyTable = tradeOriginalStream, outputTable = getStreamEngine("processSellOrder"+string(i)), keyColumn=`SecurityID`BuyNum, keepOrder =true)
subscribeTable(tableName = "tradeOriginalStream", actionName = "processBuyOrder"+string(i), offset = -1, handler = getStreamEngine("processBuyOrder"+string(i)), msgAsTable = true, hash = i, filter = (parallel, i-1))
}
}
def processSellOrderFunc(parallel){
colName = `SecurityID`BuyNum`TradeTime`SellNum`TradeAmount`TradeQty`TotalBuyAmount`BuyOrderFlag`PrevTotalBuyAmount`PrevBuyOrderFlag
colType = [SYMBOL, INT, TIMESTAMP, INT, DOUBLE, INT, DOUBLE, INT, DOUBLE, INT]
processBuyOrder = table(1:0, colName, colType)
metricsSell = [
<TradeTime>,
<TradeAmount>,
<cumsum(TradeAmount)>,
<tagFunc(cumsum(TradeQty))>,
<prev(cumsum(TradeAmount))>,
<prev(tagFunc(cumsum(TradeQty)))>,
<BuyNum>,
<TotalBuyAmount>,
<BuyOrderFlag>,
<PrevTotalBuyAmount>,
<PrevBuyOrderFlag>]
for(i in 1..parallel){
createReactiveStateEngine(name = "processSellOrder"+string(i), metrics = metricsSell, dummyTable = processBuyOrder, outputTable = getStreamEngine("processCapitalFlow"+string(i)), keyColumn = `SecurityID`SellNum, keepOrder = true)
}
}
def processCapitalFlowFunc(parallel){
colName = `SecurityID`SellNum`TradeTime`TradeAmount`TotalSellAmount`SellOrderFlag`PrevTotalSellAmount`PrevSellOrderFlag`BuyNum`TotalBuyAmount`BuyOrderFlag`PrevTotalBuyAmount`PrevBuyOrderFlag
colType = [SYMBOL, INT, TIMESTAMP, DOUBLE, DOUBLE, INT, DOUBLE, INT, INT, DOUBLE, INT, DOUBLE, INT]
processSellOrder = table(1:0, colName, colType)
metrics1 = <dynamicGroupCumsum(TotalSellAmount, PrevTotalSellAmount, SellOrderFlag, PrevSellOrderFlag, 3)>
metrics2 = <dynamicGroupCumcount(SellOrderFlag, PrevSellOrderFlag, 3)>
metrics3 = <dynamicGroupCumsum(TotalBuyAmount, PrevTotalBuyAmount, BuyOrderFlag, PrevBuyOrderFlag, 3)>
metrics4 = <dynamicGroupCumcount(BuyOrderFlag, PrevBuyOrderFlag, 3)>
for(i in 1..parallel){
createReactiveStateEngine(name = "processCapitalFlow"+string(i), metrics = [<TradeTime>, <cumsum(TradeAmount)>, metrics1, metrics2, metrics3, metrics4], dummyTable =processSellOrder, outputTable = capitalFlowStream, keyColumn = `SecurityID, keepOrder = true)
}
}
def processCapitalFlow60minFunc(){
aggrMetrics = <[
last(TotalAmount),
last(SellSmallAmount),
last(SellMediumAmount),
last(SellBigAmount),
last(SellSmallCount),
last(SellMediumCount),
last(SellBigCount),
last(BuySmallAmount),
last(BuyMediumAmount),
last(BuyBigAmount),
last(BuySmallCount),
last(BuyMediumCount),
last(BuyBigCount)]>
createDailyTimeSeriesEngine(name = "processCapitalFlow60min", windowSize = 60000*60, step = 60000*60, metrics = aggrMetrics, dummyTable = capitalFlowStream, outputTable =capitalFlowStream60min, timeColumn="TradeTime", useSystemTime=false, keyColumn=`SecurityID, useWindowStartTime=false)
subscribeTable(tableName = "capitalFlowStream", actionName = "processCapitalFlow60min", offset = -1, handler = getStreamEngine("processCapitalFlow60min"), msgAsTable = true, batchSize = 10000, throttle=1, hash = 0)
}
parallel = 3
processCapitalFlowFunc(parallel)
go
processSellOrderFunc(parallel)
go
processBuyOrderFunc(parallel)
processCapitalFlow60minFunc()