-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathalpha98InDDB.dos
More file actions
89 lines (72 loc) · 2.86 KB
/
alpha98InDDB.dos
File metadata and controls
89 lines (72 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
//创建库表定义,如若已存在,则会drop已有的库重新建库建表
def createDayDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db = database(dbName, RANGE, 2000.01M + (0..30)*12 )
t =table(1:0, `securityid`tradetime`open`close`high`low`vol`val`vwap, [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE])
db.createPartitionedTable(t, tbName, `tradetime)
}
//模拟数据数据定义
def genKday(n){
tradetime = select * from table(timestamp(distinct(businessDay(temporalAdd(2010.01.01,n, "y")..yearEnd(temporalAdd(2010.01.01,n, "y"))))).sort() as tradetime) where tradetime >=temporalAdd(2010.01.01,n, "y")
securityid ="sz"+lpad(string(000001..004000), 6, `0)
tmpTable = cj(table(securityid as securityid),tradetime)
open = rand(100.0, size(tradetime)*4000)
high = open + rand(1.0,size(tradetime)*4000)
low = high - rand(2.0,size(tradetime)*4000)
close = open + norm(0,2,size(tradetime)*4000)
vol = rand(100000,size(tradetime)*4000)
val = close*vol
vwap = close
resTable = tmpTable join table(open,close, high, low, vol, val, vwap)
tradeDate=NULL
tradeMin = NULL
tradetime =NULL
securityid =NULL
tmpTable = NULL
open =NULL
high = NULL
low = NULL
close =NULL
vol = NULL
val = NULL
vwap = NULL
db = loadTable("dfs://k_day_level","k_day")
db.append!(resTable)
}
def writeInDayByYear(numOfYear){
for (n in 0..(numOfYear-1)){
submitJob("genKday_"+string(n),"genKday_"+string(n),genKday,n)
}
}
//1. 创建分钟频数据库表
createDayDbTable("dfs://k_day_level","k_day")
//2. 模拟写入1年数据
writeInDayByYear(1)
//alpha98 in SQL
def alpha98(stock){
t = select securityid, tradetime, vwap, open, mavg(vol, 5) as adv5, mavg(vol,15) as adv15 from stock context by securityid
update t set rank_open = rank(open), rank_adv15 = rank(adv15) context by tradetime
update t set decay7 = mavg(mcorr(vwap, msum(adv5, 26), 5), 1..7), decay8 = mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1..8) context by securityid
return select securityid, tradetime, rank(decay7)-rank(decay8) as A98 from t context by tradetime
}
t = loadTable("dfs://k_day_level","k_day")
timer alpha98(t)
//alpha98 in matrix
def myrank(x){
return rowRank(x)\x.columns()
}
def alphaPanel98(vwap, open, vol){
return myrank(mavg(mcorr(vwap, msum(mavg(vol, 5), 26), 5), 1..7)) - myrank(mavg(mrank(9 - mimin(mcorr(myrank(open), myrank(mavg(vol, 15)), 21), 9), true, 7), 1..8))
}
t = select * from loadTable("dfs://k_day_level","k_day")
timer vwap, open, vol = panel(t.tradetime, t.securityid, [t.vwap, t.open, t.vol])
timer res = alphaPanel98(vwap, open, vol)
//prepare data for python side
vwap2 = select vwap from t pivot by tradetime, securityid
share vwap2 as vwap1
open2 = select open from t pivot by tradetime, securityid
share open2 as open1
vol2 = select vol from t pivot by tradetime, securityid
share vol2 as vol1