-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathappendix_8.1_case1_daily.dos
More file actions
215 lines (178 loc) · 8.37 KB
/
appendix_8.1_case1_daily.dos
File metadata and controls
215 lines (178 loc) · 8.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
/*
* 模拟数据,如若您有日线数据可以直接入库,或者也可可以从快照或tick中计算取数
* 数据跨度为10年,4000只股票
* 一共9个字段,10年的数据压缩前大约为1G,分年导入。
* 此处创建一个按年VALUE分区的数据库,每个分区压缩前大约100M
* 本例涉及的因子是Alpha 1和 Alpha 98,日频数据一般采用面板模式计算,或者SQL模式计算,继而可以直接写入对应的单值或者多值结果表。
*/
//创建库表定义,如若已存在,则会drop已有的库重新建库建表
def createDayDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db = database(dbName, RANGE, 2000.01M + (0..30)*12 )
t =table(1:0, `securityid`tradetime`open`close`high`low`vol`val`vwap, [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE])
db.createPartitionedTable(t, tbName, `tradetime)
}
//模拟数据数据定义
def genKday(n){
tradetime = select * from table(timestamp(distinct(businessDay(temporalAdd(2010.01.01,n, "y")..yearEnd(temporalAdd(2010.01.01,n, "y"))))).sort() as tradetime) where tradetime >=temporalAdd(2010.01.01,n, "y")
securityid ="sz"+lpad(string(000001..004000), 6, `0)
tmpTable = cj(table(securityid as securityid),tradetime)
open = rand(100.0, size(tradetime)*4000)
high = open + rand(1.0,size(tradetime)*4000)
low = high - rand(2.0,size(tradetime)*4000)
close = open + norm(0,2,size(tradetime)*4000)
vol = rand(100000,size(tradetime)*4000)
val = close*vol
vwap = close
resTable = tmpTable join table(open,close, high, low, vol, val, vwap)
tradeDate=NULL
tradeMin = NULL
tradetime =NULL
securityid =NULL
tmpTable = NULL
open =NULL
high = NULL
low = NULL
close =NULL
vol = NULL
val = NULL
vwap = NULL
db = loadTable("dfs://k_day_level","k_day")
db.append!(resTable)
}
def writeInDayByYear(numOfYear){
for (n in 0..(numOfYear - 1)){
submitJob("genKday_"+string(n),"genKday_"+string(n),genKday,n)
}
}
//因子定义:包括Alpha 1和Alpha 98在SQL模式及面板模式两种
//Alpha 1
//面板
def alpha1Panel(close){
return rowRank(X=mimax(pow(iif(ratios(close) - 1 < 0, mstd(ratios(close) - 1, 20),close), 2.0), 5), percent=true) - 0.5
}
//SQL模式
def alpha1SQL(t){
res = select tradetime, securityid, mimax(pow(iif(ratios(close) - 1 < 0, mstd(ratios(close) - 1, 20), close), 2.0), 5) as val from t context by securityid
return select tradetime, securityid, `alpha1 as factorname, rank(val, percent=true) - 0.5 as val from res context by tradetime
}
//Alpha 98
//面板
def prepareDataForDDBPanel(raw_data, start_time, end_time){
t = select tradetime,securityid, vwap,vol,open from raw_data where date(tradetime) between start_time : end_time
return dict(`vwap`open`vol, panel(t.tradetime, t.securityid, [t.vwap, t.open, t.vol]))
}
def alpha98Panel(vwap, open, vol){
return rowRank(X = mavg(mcorr(vwap, msum(mavg(vol, 5), 26), 5), 1..7),percent=true) - rowRank(X=mavg(mrank(9 - mimin(mcorr(rowRank(X=open,percent=true), rowRank(X=mavg(vol, 15),percent=true), 21), 9), true, 7), 1..8),percent=true)
}
//SQL模式
def alpha98SQL(t){
update t set adv5 = mavg(vol, 5), adv15 = mavg(vol, 15) context by securityid
update t set rank_open = rank(X = open,percent=true), rank_adv15 =rank(X=adv15,percent=true) context by date(tradetime)
update t set decay7 = mavg(mcorr(vwap, msum(adv5, 26), 5), 1..7), decay8 = mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1..8) context by securityid
return select tradetime,securityid, `alpha98 as factorname, rank(X =decay7,percent=true)-rank(X =decay8,percent=true) as val from t context by date(tradetime)
}
//面板形式计算Alpha 1因子
def calculateAlpha1InPanel(){
input = exec close from loadTable("dfs://k_day_level","k_day") where date(tradetime) between 2010.01.01 : 2020.01.31 pivot by tradetime, securityid
resAlpha1Panel = alpha1Panel(input)
return resAlpha1Panel
}
//SQL形式计算Alpha 1因子
def calculateAlpha1InSQL(){
input = select tradetime,securityid, close from loadTable("dfs://k_day_level","k_day") where date(tradetime) between 2010.01.01 : 2010.12.31
alpha1DDBSql = alpha1SQL(input)
return alpha1DDBSql
}
//面板形式计算Alpha 98因子
def calculateAlpha98InPanel(){
raw_data = loadTable("dfs://k_day_level","k_day")
start_time = 2010.01.01
end_time = 2019.12.31
input = prepareDataForDDBPanel(raw_data, start_time, end_time)
alpha98DDBPanel = alpha98Panel(input.vwap, input.open, input.vol)
return alpha98DDBPanel
}
//SQL形式计算Alpha 98因子
def calculateAlpha98InSQL(){
//SQL模式
input = select tradetime,securityid, vwap,vol,open from loadTable("dfs://k_day_level","k_day") where date(tradetime) between 2010.01.01 : 2019.12.31
alpha98DDBSql = alpha98SQL(input)
return alpha98DDBSql
}
/*
因子数据库表结构定义:
单值存储表loadTable("dfs://K_FACTOR_VERTICAL","factor_k")
存储分钟频率、日频因子。采用OLAP引擎。按年,以及因子名VALUE分区
多值宽表loadTable("dfs://K_FACTOR_WIDE","factor_k_wide")
宽表的列分别为时间列,因子名,各标的名称。采用TSDB引擎。按年,以及因子名VALUE分区
*/
def createFactorVerticalDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbDate = database("", RANGE, 2000.01M + (0..30)*12)
dbSymbol=database("", VALUE, `factor1`factor2)
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate,dbSymbol])
mtable=table(1:0, `tradedate`securityid`factorname`val, [TIMESTAMP,SYMBOL,SYMBOL,DOUBLE]);
k_day = db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`tradedate`factorname)
}
def createFactorWideDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbDate = database("", RANGE, 2000.01M + (0..30)*12)
dbSymbol=database("", VALUE, `factor1`factor2)
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate,dbSymbol],engine="TSDB")
baseColNames = `tradetime`factorname join ("sz"+lpad(string(000001..004000), 6, `0))
baseColType = ([TIMESTAMP,SYMBOL]).append!(take(DOUBLE,4000))
mtable=table(1:0, baseColNames,baseColType);
min_factor = db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`tradetime`factorname,sortColumns=`factorname`tradetime,compressMethods={tradetime:"delta"},keepDuplicates=LAST)
}
//创建表函数
def createAllTables(){
createDayDbTable("dfs://k_day_level","k_day")
createFactorVerticalDbTable("dfs://K_FACTOR_VERTICAL", "factor_k")
createFactorWideDbTable("dfs://K_FACTOR_WIDE", "factor_k_wide")
}
//Alpha1因子转为宽表入库
def writeAlpha1InWideTable(resAlpha1Panel){
resWideAlpha1 = table(resAlpha1Panel.rowNames() as tradetime, resAlpha1Panel).rename!(array(STRING,0,100).append!(["tradetime"]).append!(resAlpha1Panel.colNames()))
resWideAlpha1[`factorname]=`alpha1
reorderColumns!(resWideAlpha1,`tradetime`factorname)
loadTable("dfs://K_FACTOR_WIDE","factor_k_wide").append!(resWideAlpha1)
}
//Alpha98因子转为宽表入库
def writeAlpha98InWideTable(alpha98DDBPanel){
alpha98DDBPanel_wide = table(alpha98DDBPanel.rowNames() as tradetime, alpha98DDBPanel).rename!(array(STRING,0,100).append!(["tradetime"]).append!(alpha98DDBPanel.colNames()))
alpha98DDBPanel_wide[`factorname]=`alpha98
reorderColumns!(alpha98DDBPanel_wide,`tradetime`factorname)
loadTable("dfs://K_FACTOR_WIDE","factor_k_wide").append!(alpha98DDBPanel_wide)
}
//1. 创建分钟频数据库表、因子单值模型库表,因子多值模型库表
createAllTables()
//2. 模拟写入10年数据
writeInDayByYear(10)
sleep(5000)
allJobs=select * from getRecentJobs() order by endTime desc
//3. 计算并写入因子库
// 3.1 计算
//面板形式计算Alpha 1因子
resAlpha1Panel=calculateAlpha1InPanel()
//SQL形式计算Alpha 1因子
alpha1DDBSql=calculateAlpha1InSQL()
//面板形式计算Alpha 98因子
alpha98DDBPanel=calculateAlpha98InPanel()
//SQL形式计算Alpha 98因子
alpha98DDBSql=calculateAlpha98InSQL()
//3.2 写入因子结果库表
//写入单值模型存储:SQL模式的因子结果可以直接写入单值模型
loadTable("dfs://K_FACTOR_VERTICAL","factor_k").append!(alpha1DDBSql)
loadTable("dfs://K_FACTOR_VERTICAL","factor_k").append!(alpha98DDBSql)
//写入多值模型存储:面板模式计算的因子,只需要转成table,可以直接写入多指模型
//Alpha1因子转为宽表入库
writeAlpha1InWideTable(resAlpha1Panel)
//Alpha98因子转为宽表入库
writeAlpha98InWideTable(alpha98DDBPanel)