-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy path2. IOPV_realtime_single.dos
More file actions
71 lines (61 loc) · 3.45 KB
/
2. IOPV_realtime_single.dos
File metadata and controls
71 lines (61 loc) · 3.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/*
* 程序名: 单只ETF实时 IOPV 计算
* 作者: DolphinDB
* 时间: 2022-06-15
* 程序说明: 本程序为 DolphinDB 单只 ETF 基金 IOPV 实时计算
* 基金份额参考净值(原始公式)= {∑(pi*sharesi) * 10 + AI + ECC} /10000
* 基金份额参考净值(本案例公式)= {∑(pi*sharesi) /1000}
*/
// 登录系统并清除缓存
login("admin","123456")
clearAllCache()
go
// 环境准备
try{
dropAggregator(`IOPV_calculator)
unsubscribeTable(tableName="TradeStreamData", actionName="trade_subscribe")
unsubscribeTable(tableName="IOPVStreamResult", actionName="IOPV_mq_read")
dropStreamTable("TradeStreamData")
dropStreamTable("IOPVStreamResult")
}catch(ex){}
try{
loadPlugin("plugins/zmq/PluginZmq.txt")
}catch(ex){}
/*
* 构建基金组合
*/
// 构建组合成分券,设置为50只成分券(*注 本组合参考西部利得创业板大盘交易型开放式基金)
symbols = `300073`300363`300373`300474`300682`300769`301029`300390`300957`300763`300979`300919`300037`300832`300866`300896`300751`300223`300676`300274`300413`300496`300661`300782`300142`300759`300595`300285`300146`300207`300316`300454`300529`300699`300628`300760`300601`300450`300433`300408`300347`300124`300122`300059`300033`300015`300014`300012`300003`300750
// 通过rand随机函数,为50只成分券设置持仓手数
positions = rand(76339..145256, 50)
// 构建基金
portfolio = dict(symbols, positions)
/*
* 创建pub流表,流计算引擎,sub流表
*/
// 创建pub流表 TradeStreamData 用于模拟回放 Level2 逐笔成交价格实时行情
t = streamTable(100:0, `SecurityID`tradedate`tradetime`price,[SYMBOL, DATE,TIMESTAMP,DOUBLE])
enableTableShareAndPersistence(table=t, tableName=`TradeStreamData, cacheSize=1000000)
go
// 创建sub流表 IOPVStreamResult 用于接收 IOPV 实时计算结果
share streamTable(1000:0, `tradetime`tradedate`IOPV, [TIMESTAMP,DATE,DOUBLE]) as IOPVStreamResult
// 创建名为 IOPV_calculator的横截面聚合流计算引擎 IOPV_engine, 在该引擎中使用元编程计算 IOPV。
IOPV_engine = createCrossSectionalEngine(name="IOPV_calculator", metrics=[<last(tradedate)>, <sum(ffill(price) * portfolio[SecurityID]/1000)>], dummyTable=TradeStreamData, outputTable=IOPVStreamResult, keyColumn=`SecurityID, triggeringPattern='perRow', timeColumn=`tradetime, useSystemTime=false)
// 创建流表订阅事件trade_subscribe,过滤非组合内的股票,同时该订阅使用IOPV_engine计算引擎。
setStreamTableFilterColumn(TradeStreamData, `SecurityID)
subscribeTable(tableName="TradeStreamData", actionName="trade_subscribe", offset=0, handler=append!{IOPV_engine}, msgAsTable=true, batchSize=10000, throttle=1, hash=0, filter=portfolio.keys());
/*
* 主程序
*/
//python3 subZmq.py
try{
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::bind(socket, "tcp://*:20414")
}catch(ex){}
subscribeTable(tableName="IOPVStreamResult", actionName="IOPV_mq_read", offset=0, handler=zmq::send{socket}, msgAsTable=true)
//trs = cutPoints(09:30:00.000..16:00:00.000, 60)
rds = replayDS(<select SecurityID, tradedate, tradetime , price from loadTable("dfs://LEVEL2_SZ","Trade") where tradedate = 2020.12.01, price>0 >, `tradedate, `tradetime, cutPoints(09:30:00.000..16:00:00.000, 60));
submitJob("replay_order", "replay_trades_stream", replay, rds, `TradeStreamData, `tradedate, `tradetime, 1000000, true, 4)
result = select * from IOPVStreamResult;
getStreamingStat().subWorkers