-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy path延时订单因子流式实现.dos
More file actions
87 lines (70 loc) · 5.01 KB
/
延时订单因子流式实现.dos
File metadata and controls
87 lines (70 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
* 流式实现
*/
try{dropAggregator("lsjEngineBid")}catch(ex){ print(ex) }
try{dropAggregator("lsjEngineOffer")}catch(ex){ print(ex) }
try{dropAggregator("second_reactiveDemo")}catch(ex){ print(ex) }
try{dropAggregator("reactiveDemo")}catch(ex){ print(ex) }
try{dropAggregator("TSengine")}catch(ex){ print(ex) }
try{unsubscribeTable(tableName="tradeTable", actionName="Bid")}catch(ex){ print(ex) }
try{unsubscribeTable(tableName="entrustTable", actionName="Bid")}catch(ex){ print(ex) }
try{unsubscribeTable(tableName="tradeTable", actionName="Offer")}catch(ex){ print(ex) }
try{unsubscribeTable(tableName="entrustTable", actionName="Offer")}catch(ex){ print(ex) }
try{unsubscribeTable(tableName=`lsjoutput, actionName="DelayedTraderByApplSeqNum")}catch(ex){ print(ex) }
try{unsubscribeTable(tableName="RSEresult", actionName="TSengine")}catch(ex){ print(ex) }
try{unsubscribeTable(tableName=`lsjoutput, actionName="DelayedTrader")}catch(ex){ print(ex) }
try{dropStreamTable(`tradeTable)}catch(ex){ print(ex) }
try{dropStreamTable(`entrustTable)}catch(ex){ print(ex) }
try{dropStreamTable(`lsjoutput)}catch(ex){ print(ex) }
try{dropStreamTable(`firstReactiveresult)}catch(ex){ print(ex) }
try{dropStreamTable(`result)}catch(ex){ print(ex) }
go
trade=select * from loadTable("dfs://level_2", "trade") where DateTime.date()=2022.04.14 and SecurityID in[ `600000.SH,`000001.SZ] and tradePrice>0 and DateTime>2022.04.14T09:30:00.000
entrust=select * from loadTable("dfs://level_2", "entrust") where DateTime.date()=2022.04.14 and SecurityID in[ `600000.SH,`000001.SZ]and DateTime>2022.04.14T09:30:00.000
////定义流表
share streamTable(5000000:0, schema(trade).colDefs.name, schema(trade).colDefs.typeString) as tradeTable
share streamTable(5000000:0, schema(entrust).colDefs.name, schema(entrust).colDefs.typeString) as entrustTable
share streamTable(5000000:0, `code`ApplSeqNum`TradeTime`OrderTime`TradePrice`TradeQty`DelayedTradeFlag`BuySellFlag`cumTradeQty, [SYMBOL,INT,TIMESTAMP,TIMESTAMP, DOUBLE,LONG,INT,SYMBOL,LONG]) as lsjoutput
share streamTable(5000000:0, `code`TradeTime`DelayedTradeBuyOrderNum`DelayedTradeSellOrderNum`DelayedTradeBuyOrderQty`DelayedTradeSellOrderQty, [SYMBOL,TIMESTAMP, LONG,LONG, LONG,LONG]) as RSEresult
share streamTable(5000000:0, `TradeTime`code`DelayedTradeBuyOrderNum`DelayedTradeSellOrderNum`DelayedTradeBuyOrderQty`DelayedTradeSellOrderQty, [TIMESTAMP,SYMBOL, LONG,LONG, LONG,LONG]) as result
////定义引擎
/////第一层
go
metrics = [
<tradeTable.DateTime>,
<entrustTable.DateTime>,
<TradePrice>,
<TradeQty>,
<cumsum(iif((tradeTable.DateTime-entrustTable.DateTime)>60000,1,0)) as DelayedTraderflag>,
<Side>,
<cumsum(TradeQty)>]
lsjEngineBid=createLeftSemiJoinEngine("lsjEngineBid", tradeTable, entrustTable, lsjoutput, metrics,[[`SecurityID,`BidApplSeqNum],[`SecurityID,`ApplSeqNum]],50000000,true)
subscribeTable(tableName="tradeTable", actionName="Bid", offset=0, handler=appendForJoin{lsjEngineBid, true}, msgAsTable=true)
subscribeTable(tableName="entrustTable", actionName="Bid", offset=0, handler=appendForJoin{lsjEngineBid, false}, msgAsTable=true)
lsjEngineOffer=createLeftSemiJoinEngine("lsjEngineOffer", tradeTable, entrustTable, lsjoutput, metrics,[[`SecurityID,`OfferApplSeqNum],[`SecurityID,`ApplSeqNum]],50000000,true)
subscribeTable(tableName="tradeTable", actionName="Offer", offset=0, handler=appendForJoin{lsjEngineOffer, true}, msgAsTable=true)
subscribeTable(tableName="entrustTable", actionName="Offer", offset=0, handler=appendForJoin{lsjEngineOffer, false}, msgAsTable=true)
/////第二层
@state
def delayedTradeNum(bsFlag, flag, side){
return iif(bsFlag==side && flag<=1, flag, 0).cumsum()
}
@state
def delayedTradeQty(bsFlag, flag, tradeQty, cumTradeQty, side){
return iif(bsFlag==side && flag>1, tradeQty, iif(bsFlag==side && flag==1, cumTradeQty, 0)).cumsum()
}
metrics = array(ANY, 5)
metrics[0]=<TradeTime>
metrics[1]=<delayedTradeNum(BuySellFlag,DelayedTradeFlag,"B")>
metrics[2]=<delayedTradeNum(BuySellFlag,DelayedTradeFlag,"S")>
metrics[3]=<delayedTradeQty(BuySellFlag,DelayedTradeFlag,TradeQty,cumTradeQty,"B")>
metrics[4]=<delayedTradeQty(BuySellFlag,DelayedTradeFlag,TradeQty,cumTradeQty,"S")>
secondrse = createReactiveStateEngine(name="reactiveDemo", metrics =metrics, dummyTable=lsjoutput, outputTable=RSEresult, keyColumn=["code"],filter=<TradePrice>0>)
subscribeTable(tableName=`lsjoutput, actionName="DelayedTrader", handler=tableInsert{secondrse})
/////第三层
tsengine = createTimeSeriesEngine(name="TSengine", windowSize=60000, step=60000, metrics=<[last(DelayedTradeBuyOrderNum),last(DelayedTradeSellOrderNum),last(DelayedTradeBuyOrderQty),last(DelayedTradeSellOrderQty)]>, dummyTable=RSEresult, outputTable=result, timeColumn=`TradeTime, useSystemTime=false, keyColumn=`code, garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="RSEresult", actionName="TSengine", offset=0, handler=append!{tsengine}, msgAsTable=true);
////append数据
tradeTable.append!(trade)
entrustTable.append!(entrust)
res1=select * from result context by code having TradeTime=max(TradeTime) limit -1