-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathappendix_8.2_case2_minute.dos
More file actions
230 lines (190 loc) · 8.89 KB
/
appendix_8.2_case2_minute.dos
File metadata and controls
230 lines (190 loc) · 8.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
/*
* 模拟数据,如若您有分钟k线数据可以直接入库,或者也可可以从快照或tick中计算取数
* 数据跨度为1年,4000只股票,每只股票工作日的分钟线从9:30 ~11:30, 13:00 ~15:00
* 一共9个字段,一年的数据压缩前大约为20G,分月导入。
* 此处创建一个按月VALUE分区,按股票HASH的组合分区,每个分区压缩前大约500M
* 本例涉及的因子是日内收益偏度因子和doubleEMA,分钟频率的数据一般采用SQL模式计算,这样可以在分区内部并行计算。
*/
//创建库表定义,如若已存在,则会drop已有的库重新建库建表
def createMinuteDbTable(dbName,tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbDate = database("", VALUE,2020.01M..2020.12M )
dbSym= database("", HASH, [SYMBOL,3])
db = database(dbName, COMPO, [dbDate, dbSym])
t =table(1:0, `securityid`tradetime`open`close`high`low`vol`val`vwap, [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE])
db.createPartitionedTable(t, tbName, `tradetime`securityid)
}
//模拟数据定义,并导入库表。模拟12个月的数据加上入库大约耗时1分钟。
def genKminute(n){
tradeDate= select * from table(distinct(businessDay(temporalAdd(2020.01.01,n, "M")..monthEnd(temporalAdd(2020.01.01,n, "M")))).sort() as tradeDate) where tradeDate>=temporalAdd(2020.01.01,n, "M")
tradeMin = table(09:30:00.000+0..120*60*1000 join (13:00:00.000+0..120*60*1000) as tradeMin)
tradetime = select concatDateTime(tradeDate,tradeMin) as tradetime from cj(tradeDate,tradeMin)
securityid ="sz"+lpad(string(000001..004000), 6, `0)
tmpTable = cj(table(securityid as securityid),tradetime)
open = rand(100.0, size(tradetime)*4000)
high = open + rand(1.0,size(tradetime)*4000)
low = high - rand(2.0,size(tradetime)*4000)
close = open + norm(0,2,size(tradetime)*4000)
vol = rand(100000,size(tradetime)*4000)
val = close*vol
vwap = close
resTable = tmpTable join table(open,close, high, low, vol, val, vwap)
tradeDate=NULL
tradeMin = NULL
tradetime =NULL
securityid =NULL
tmpTable = NULL
open =NULL
high = NULL
low = NULL
close =NULL
vol = NULL
val = NULL
vwap = NULL
db = loadTable("dfs://k_minute_level","k_minute")
db.append!(resTable)
}
def writeInMinuteByMonth(numOfMonth){
for (n in 0..(numOfMonth-1)){
submitJob("genKminute_"+string(n),"genKminute_"+string(n),genKminute,n)
}
}
/*
* 因子定义:
* 此节用到了两个因子,分别为日内收益率偏度dayReturnSkew以及DoubleEMA
*/
//dayReturnSkew
defg dayReturnSkew(close){
return skew(ratios(close))
}
//DoubleEMA
def sum_diff(x, y){
return (x-y)\(x+y)
}
def factorDoubleEMA(price){
ema_20 = ema(price, 20)
ema_40 = ema(price, 40)
sum_diff_1000 = 1000 * sum_diff(ema_20, ema_40)
return ema(sum_diff_1000,10) - ema(sum_diff_1000, 20)
}
/*
因子数据库表结构定义:
单值存储表loadTable("dfs://K_FACTOR_VERTICAL","factor_k")
存储分钟频率、日频因子。采用OLAP引擎。按年,以及因子名VALUE分区
多值宽表loadTable("dfs://K_FACTOR_WIDE","factor_k_wide")
宽表的列分别为时间列,因子名,各标的名称。采用TSDB引擎。按年,以及因子名VALUE分区
*/
def createFactorVerticalDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbDate = database("", RANGE, 2000.01M + (0..30)*12)
dbSymbol=database("", VALUE, `factor1`factor2)
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate,dbSymbol])
mtable=table(1:0, `tradedate`securityid`factorname`val, [TIMESTAMP,SYMBOL,SYMBOL,DOUBLE]);
k_day = db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`tradedate`factorname)
}
def createFactorWideDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbDate = database("", RANGE, 2000.01M + (0..30)*12)
dbSymbol=database("", VALUE, `factor1`factor2)
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate,dbSymbol],engine="TSDB")
baseColNames = `tradetime`factorname join ("sz"+lpad(string(000001..004000), 6, `0))
baseColType = ([TIMESTAMP,SYMBOL]).append!(take(DOUBLE,4000))
mtable=table(1:0, baseColNames,baseColType);
min_factor = db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`tradetime`factorname,sortColumns=`factorname`tradetime,compressMethods={tradetime:"delta"},keepDuplicates=LAST)
}
//创建因子函数配置表
def createFactorConfigTable(){
db=database("dfs://k_minute_level")
if(existsTable("dfs://k_minute_level", "factor_func_config"))
db.dropTable("factor_func_config")
factor_tab_model=table(1:0,`func_name`factor_name`start_date`end_date,[STRING,STRING,DATE,DATE])
factor_func_config=db.createTable(table=factor_tab_model, tableName=`factor_func_config)
factor_func_config=loadTable("dfs://k_minute_level",`factor_func_config)
func_table = table(take(`factorDoubleEMA,20) as func_name, string(1..20) as factorname, 2020.01.01..2020.01.20 as start_date,2020.01.02..2020.01.21 as end_date)
factor_func_config.append!(func_table)
}
//定义一个根据因子名,实践动态调用并保存数据到宽表的通用接口函数
def write_in_wide_database(funcN,factorN,start_date, end_date){
res = select tradetime, securityid, `doubleEMA as factorname, funcByName(funcN)(close) as factor_value from loadTable("dfs://k_minute_level","k_minute") where date(tradetime) between start_date : end_date context by securityid
if(res.size()==0) return
pivot_res = select factor_value from res pivot by tradetime,securityid
pivot_res[`factorname]=string(factorN)
reorderColumns!(pivot_res,`tradetime`factorname)
res_t = loadTable("dfs://K_FACTOR_WIDE","factor_k_wide")
res_t.append!(pivot_res)
}
//建表操作函数
def createAllTables(){
createMinuteDbTable("dfs://k_minute_level","k_minute")
createFactorVerticalDbTable("dfs://K_FACTOR_VERTICAL", "factor_k")
createFactorWideDbTable("dfs://K_FACTOR_WIDE", "factor_k_wide")
}
//分钟收益率因子计算
def getMinReturn(){
minReturn = select `dayReturnSkew as factorname, dayReturnSkew(close) as val from loadTable("dfs://k_minute_level","k_minute") where date(tradetime) between 2020.01.02 : 2020.01.31 group by date(tradetime) as tradetime, securityid map
return minReturn
}
//分钟收益率因子按宽表入库函数
def writeMinReturnFactorInWideTable(minReturn){
pivot_res = select val from minReturn pivot by TradeTime, SecurityID
pivot_res[`factorname]=`flow
reorderColumns!(pivot_res,`TradeTime`factorname)
loadTable("dfs://K_FACTOR_WIDE","factor_k_wide").append!(pivot_res)
}
//计算doubleEMA因子
def getDoubleEMA(){
doubleEMA = select tradetime, securityid, `doubleEMA as factorname, factorDoubleEMA(close) as val from loadTable("dfs://k_minute_level","k_minute") where date(tradetime) between 2020.01.01 : 2020.01.31 context by securityid
return doubleEMA
}
//DoubleEMA因子按宽表入库函数
def writeDoubleEMAFactorInWideTable(doubleEMA){
pivot_res = select val from doubleEMA pivot by TradeTime, SecurityID
pivot_res[`factorname]=`flow
reorderColumns!(pivot_res,`TradeTime`factorname)
loadTable("dfs://K_FACTOR_WIDE","factor_k_wide").append!(pivot_res)
}
login("admin","123456")
//1. 创建分钟频数据库表、因子单值模型库表,因子多值模型库表
createAllTables()
//2. 模拟写入12个月数据
writeInMinuteByMonth(12)
sleep(40000)
//allJobs=select * from getRecentJobs() order by endTime desc
//3.1 计算因子并写入单值模型存储
//计算分钟收益率因子
minReturn=getMinReturn()
//计算doubleEMA因子
doubleEMA=getDoubleEMA()
//因子写入纵表
loadTable("dfs://K_FACTOR_VERTICAL","factor_k").append!(minReturn)
loadTable("dfs://K_FACTOR_VERTICAL","factor_k").append!(doubleEMA)
//3.2 计算并写入宽表多值模型存储
//分钟收益率因子按宽表入库
writeMinReturnFactorInWideTable(minReturn)
//DoubleEMA因子按宽表入库
writeDoubleEMAFactorInWideTable(doubleEMA)
//4 因子计算工程化
//将计算函数保存到数据库,以便系统调度使用
addFunctionView(factorDoubleEMA)
//创建因子调度配置表及配置数据
createFactorConfigTable()
//daily_cal:用来批量提交job,计算增量数据因子,并保存到宽表
def daily_cal(){
//加载因子函数配置表
factor_func_config=select * from loadTable("dfs://k_minute_level","factor_func_config")
for (i in 0..(size(factor_func_config)-1)){
funcN=factor_func_config[`func_name][i]
factorN=factor_func_config[`factor_name][i]
start_date=factor_func_config[`start_date][i]
end_date=factor_func_config[`end_date][i]
submitJob("submitjob_sche"+funcN+factorN, "write_in_wide_database"+funcN+factorN+start_date, write_in_wide_database,funcN,factorN,start_date, end_date)
}
}
//通过配置schedule 定时启动daily_cal ,适用案例时注意调整时间
scheduleJob(jobId=`daily1, jobDesc="Daily Job 1", jobFunc=daily_cal, scheduleTime=16:51m, startDate=2022.05.08, endDate=2022.05.08, frequency='D');