-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathappendix_4.1.2_streamComputationOfSmallInflowRate_main.dos
More file actions
109 lines (97 loc) · 4.29 KB
/
appendix_4.1.2_streamComputationOfSmallInflowRate_main.dos
File metadata and controls
109 lines (97 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
def createResultTable(){
return table(
array(SYMBOL, 0) as SecurityID,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as smallBuyOrderAmount,
array(DOUBLE, 0) as smallSellOrderAmount,
array(DOUBLE, 0) as totalOrderAmount,
array(DOUBLE, 0) as factor)
}
def createTradeSchema(){
return table(
array(SYMBOL, 0) as SecurityID,
array(INT, 0) as BuyNo,
array(INT, 0) as SellNo,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as TradeAmount)
}
def createResult1Schema() {
return table(
array(INT, 0) as BuyNo,
array(SYMBOL, 0) as SecurityID,
array(INT, 0) as SellNo,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as TradeAmount,
array(DOUBLE, 0) as BuyCumAmount,
array(DOUBLE, 0) as PrevBuyCumAmount,
array(INT, 0) as BuyOrderFlag,
array(INT, 0) as PrevBuyOrderFlag)
}
def createResult2Schema() {
return table(
array(INT, 0) as SellNo,
array(INT, 0) as BuyNo,
array(SYMBOL, 0) as SecurityID,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as TradeAmount,
array(DOUBLE, 0) as BuyCumAmount,
array(DOUBLE, 0) as PrevBuyCumAmount,
array(INT, 0) as BuyOrderFlag,
array(INT, 0) as PrevBuyOrderFlag,
array(DOUBLE, 0) as SellCumAmount,
array(DOUBLE, 0) as PrevSellCumAmount,
array(INT, 0) as SellOrderFlag,
array(INT, 0) as PrevSellOrderFlag)
}
def cleanStreamEngines(engineNames){
for(name in engineNames){
try{
dropStreamEngine(name)
}
catch(ex){}
}
}
@state
def factorOrderCumAmount(tradeAmount){
cumsumTradeAmount = cumsum(tradeAmount)
prevCumsumTradeAmount = prev(cumsumTradeAmount)
orderFlag = iif(cumsumTradeAmount<100000, 0, 1)
prevOrderFlag = prev(orderFlag)
return cumsumTradeAmount, prevCumsumTradeAmount, orderFlag, prevOrderFlag
}
@state
def factorSmallOrderNetAmountRatio(tradeAmount, sellCumAmount, sellOrderFlag, prevSellCumAmount, prevSellOrderFlag, buyCumAmount, buyOrderFlag, prevBuyCumAmount, prevBuyOrderFlag){
cumsumTradeAmount = cumsum(tradeAmount)
smallSellCumAmount, bigSellCumAmount = dynamicGroupCumsum(sellCumAmount, prevSellCumAmount, sellOrderFlag, prevSellOrderFlag, 2)
smallBuyCumAmount, bigBuyCumAmount = dynamicGroupCumsum(buyCumAmount, prevBuyCumAmount, buyOrderFlag, prevBuyOrderFlag, 2)
f = (smallBuyCumAmount - smallSellCumAmount) \ cumsumTradeAmount
return smallBuyCumAmount, smallSellCumAmount, cumsumTradeAmount, f
}
def createStreamEngine(result){
tradeSchema = createTradeSchema()
result1Schema = createResult1Schema()
result2Schema = createResult2Schema()
engineNames = ["rse1", "rse2", "rse3"]
cleanStreamEngines(engineNames)
metrics3 = <[TradeTime, factorSmallOrderNetAmountRatio(tradeAmount, sellCumAmount, sellOrderFlag, prevSellCumAmount, prevSellOrderFlag, buyCumAmount, buyOrderFlag, prevBuyCumAmount, prevBuyOrderFlag)]>
rse3 = createReactiveStateEngine(name=engineNames[2], metrics=metrics3, dummyTable=result2Schema, outputTable=result, keyColumn="SecurityID")
metrics2 = <[BuyNo, SecurityID, TradeTime, TradeAmount, BuyCumAmount, PrevBuyCumAmount, BuyOrderFlag, PrevBuyOrderFlag, factorOrderCumAmount(TradeAmount)]>
rse2 = createReactiveStateEngine(name=engineNames[1], metrics=metrics2, dummyTable=result1Schema, outputTable=rse3, keyColumn="SellNo")
metrics1 = <[SecurityID, SellNo, TradeTime, TradeAmount, factorOrderCumAmount(TradeAmount)]>
return createReactiveStateEngine(name=engineNames[0], metrics=metrics1, dummyTable=tradeSchema, outputTable=rse2, keyColumn="BuyNo")
}
result = createResultTable()
rse = createStreamEngine(result)
insert into rse values(`000155, 1000, 1001, 2020.01.01T09:30:00, 20000)
insert into rse values(`000155, 1000, 1002, 2020.01.01T09:30:01, 40000)
insert into rse values(`000155, 1000, 1003, 2020.01.01T09:30:02, 60000)
insert into rse values(`000155, 1004, 1003, 2020.01.01T09:30:03, 30000)
select * from result
/*
SecurityID TradeTime smallBuyOrderAmount smallSellOrderAmount totalOrderAmount factor
---------- ------------------- ------------------- -------------------- ---------------- ------
000155 2020.01.01T09:30:00 20000 20000 20000 0
000155 2020.01.01T09:30:01 60000 60000 60000 0
000155 2020.01.01T09:30:02 0 120000 120000 -1
000155 2020.01.01T09:30:03 30000 150000 150000 -0.8
*/