-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathparallelLoad.dos
More file actions
58 lines (51 loc) · 1.71 KB
/
parallelLoad.dos
File metadata and controls
58 lines (51 loc) · 1.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//多个CSV文件并行导入
login(`admin,`123456)
//建库建表
if (existsDatabase("dfs://sh_entrust"))
{
dropDatabase("dfs://sh_entrust")
}
create database "dfs://sh_entrust" partitioned by VALUE(2022.01.01..2022.01.03), HASH([SYMBOL, 10]), engine='TSDB'
create table "dfs://sh_entrust"."entrust"(
SecurityID SYMBOL,
TransactTime TIMESTAMP,
valOrderNoue INT,
Price DOUBLE,
Balance INT,
OrderBSFlag SYMBOL,
OrdType SYMBOL,
OrderIndex INT,
ChannelNo INT,
BizIndex INT)
partitioned by TransactTime,SecurityID,
sortColumns = [`SecurityID,`TransactTime]
//定义类型转换函数
def transType(mutable memTable)
{
return memTable.replaceColumn!(`col0,lpad(string(memTable.col0),6,`0)).replaceColumn!(`col1,datetimeParse(string(memTable.col1),"yyyyMMddHHmmssSSS")).replaceColumn!(`col5,string(memTable.col5)).replaceColumn!(`col6,string(memTable.col6))
}
//定义导入一天数据的函数
def loadOneDayFile(db,table,filePath)
{
csvFiles = exec filename from files(filePath) where filename like"%.csv"
for(csvIdx in csvFiles)
{
loadTextEx(dbHandle = db, tableName = table, partitionColumns = `col1`col0, filename = filePath + "/" + csvIdx, transform = transType, skipRows = 1)
}
}
//定义提交并行任务的函数
def parallelLoad(allFileContents)
{
db = database("dfs://sh_entrust")
table = `entrust
dateFiles = exec filename from files(allFileContents) where isDir = true
for(dateIdx in dateFiles)
{
submitJob("parallelLoad" + dateIdx,"parallelLoad",loadOneDayFile{db,table,},allFileContents + "/" + dateIdx)
}
}
//调用函数,提交任务,对应的目录根据实际情况修改
allFileContents = "/home/ychan/data/loadForPoc/SH/Order"
parallelLoad(allFileContents)
//查看任务执行状态
getRecentJobs()