-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathshapshot数据流式计算.dos
More file actions
38 lines (27 loc) · 1.9 KB
/
shapshot数据流式计算.dos
File metadata and controls
38 lines (27 loc) · 1.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@state
def wavgSOIRStream(bidQty,askQty,lag=20){
Imbalance_=rowWavg((bidQty-askQty)\(bidQty+askQty),
10 9 8 7 6 5 4 3 2 1)
Imbalance= ffill(Imbalance_).nullFill(0)
mean = mavg(prev(Imbalance), (lag-1), 2)
std = mstdp(prev(Imbalance) *1000000, (lag-1),2) \ 1000000
factorValue = conditionalIterate(std >= 0.0000001,
(Imbalance - mean) \ std, cumlastNot)
return ffill(factorValue).nullFill(0)
}
metrics = array(ANY, 5)
metrics[0]=<DateTime>
metrics[1] = <timeWeightedOrderSlope(bidPrice[0],bidOrderQty[0],OfferPrice[0],OfferOrderQty[0],20)>
metrics[2] =<level10_InferPriceTrend(bidPrice,OfferPrice,bidOrderQty,OfferOrderQty,60,20)>
metrics[3] =<traPriceWeightedNetBuyQuoteVolumeRatio(bidPrice[0],bidOrderQty[0],OfferPrice[0],OfferOrderQty[0],TotalValueTrade,TotalVolumeTrade,20)>
metrics[4] =<wavgSOIRStream( bidOrderQty,OfferOrderQty,20)>
share streamTable(1:0, `SecurityID`DateTime`bidPrice`OfferPrice`bidOrderQty`OfferOrderQty`TotalValueTrade`TotalVolumeTrade, [STRING,TIMESTAMP,DOUBLE[],DOUBLE[],DOUBLE[],DOUBLE[],INT,INT]) as Streamdata
result = table(1000:0, `SecurityID`DateTime`TimeWeightedOrderSlope`Level10_InferPriceTrend`TraPriceWeightedNetBuyQuoteVolumeRatio`height_Imbalance, [STRING,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE])
rse = createReactiveStateEngine(name="reactiveDemo", metrics =metrics, dummyTable=Streamdata, outputTable=result, keyColumn="SecurityID")
subscribeTable(tableName=`Streamdata, actionName="factors", handler=tableInsert{rse})
snapshotTB=select* from loadTable("dfs://TSDB_snapshot","snapshot") where date(TradeTime)=2022.04.14 and SecurityID in [`600000,`000001]
data1=select SecurityID,TradeTime as DateTime,bidPrice,OfferPrice,bidOrderQty,OfferOrderQty,TotalValueTrade,TotalVolumeTrade from snapshotTB
Streamdata.append!(data1)
select * from result where DateTime>2022.04.14T09:45:33.000
//dropAggregator("reactiveDemo")
//unsubscribeTable(tableName=`Streamdata, actionName="factors")