-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathdemo.dos
More file actions
210 lines (199 loc) · 9.36 KB
/
demo.dos
File metadata and controls
210 lines (199 loc) · 9.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
/* 使用Welch's method计算PSD功率谱密度
Welch's method是一种周期图法:
周期图法是将随机信号的N个采样点视作一个能量有限信号,傅里叶变换后,取幅值平方除以N,以此作为对真实功率谱的估计
Welch周期图法是修正的周期图功率谱密度的估计方法,它将信号分段加窗求其功率谱密度,然后做平均处理
*/
login("admin","123456")
undef all
try{loadPlugin("/hdd/hdd1/xyl/dolphindb/DolphinDB_2.00.8.12/server/plugins/signal/PluginSignal.txt");}catch(ex){}
go
def pwelch(data, window, noverlap, nfft, fs){
N = data.size();
M = nfft;
pfft = 10*log10(fs\nfft);
n_overlap = ceil(M * noverlap * 0.01); //重叠的点数,向上取整
L = floor((N - n_overlap ) / (M - n_overlap)); //计算分成了多少段数据
N_used = (M - n_overlap) * (L - 1) + M;
noise_used = data[0:(N_used)];
P_win = sum(window * window )/M; //窗函数能量计算
if (mod(nfft, 2) == 1){ //奇数
f = ((0 .. ((nfft + 1)/2)) * fs *1.0/ nfft);
fft_sum = array(DOUBLE, (nfft + 1)/2)
abs_fft_half = array(DOUBLE, (nfft + 1)/2)
}
else{ //偶数
f = ((0 .. (nfft / 2)) * fs *1.0/ nfft);
fft_sum = array(DOUBLE, (nfft/2 + 1))
abs_fft_half = array(DOUBLE, (nfft/2 + 1))
}
for(n in 1..L){ //计算每段中每个采样点fft变换后的赋值平方和
nstart = ( n - 1 )*(M - n_overlap);
temp_noise = noise_used[nstart : (nstart + M)] * window;
temp_fft = signal::mul(signal::fft(temp_noise), 1.0/nfft);
temp_fft_half = [temp_fft[0]]
temp_fft_half.append!(signal::mul(temp_fft[1:(nfft/2)], 2.0))
temp_fft_half.append!(temp_fft[nfft/2])
abs_fft_half[0] = pow(signal::abs(temp_fft_half[0]), 2.0);
end = pow(signal::abs(temp_fft_half[nfft/2]), 2);
abs_fft_half = abs_fft_half[0:1].append!(pow(signal::abs(temp_fft_half[1:(nfft/2)]), 2) * 0.5)
abs_fft_half.append!(end)
fft_sum = fft_sum + 1.0/ P_win * abs_fft_half;
}
fft_sum = fft_sum*1.0 / L //平均处理
return [pow(10, log10(fft_sum) - pfft*1.0 / 10), f] //PSD谱的纵坐标和横坐标
}
//hamming窗函数
def hamming(N){
return 0.54-0.46*cos(2*pi*(0..(N-1))/(N-1))
}
//sampling rate and time 定义采样频率和采样时间
fs = 1024; // Hz 返回的psd谱密度频率范围就是fs/2
// ts = 600; // second 10分钟
go
ts = 20;
go
//// sampling rate and bandwidth
nfft = 8192; //// modified
window = hamming(nfft); //// modified
noverlap = fs * 0; //// modified
bandwidthL = 1; //// modified
bandwidthH = 512; //// modified
sensitivity = 10; //// V/g
gain = 10;
N=ts*fs //原始数据放入引擎计算的每一段条数都要大于N,否则自动功率谱密度估计为0
//计算振动指标:rmsAcc 振动加速度均方根 rmsVel 振动速度均方根 rmsDis 振动位移均方根
defg rms(nose,N,sensitivity,gain,window, noverlap, nfft, fs,bandwidthL,bandwidthH){
if (size(nose)<N){
return 0.0,0.0,0.0 // 注意,这里必须为浮点数形式,否则引擎就会误认为输出为整型,反而会使结果精度消失
}
temp= nose/sensitivity/gain * 9.8 //电压信号改成加速度信号
temp=temp-mean(temp)
res=pwelch(temp, window, noverlap, nfft, fs) //psd谱
psdAcc=double(res[0]) // 加速度功率谱密度:(m/s^2)^2/HZ
f=double(res[1])
powAcc = psdAcc * f[1]
powVel = powAcc / square(2 *pi * f)
powDis = powVel / square(2 * pi * f)
resolution = fs*1.0 / nfft;
bandwidthLidx = int(bandwidthL / resolution) + 1;
bandwidthHidx = int(bandwidthH / resolution) + 1;
rmsAcc = sqrt(sum(powAcc[(int(bandwidthLidx) - 1):bandwidthHidx]))
rmsVel = sqrt(sum(powVel[(int(bandwidthLidx) - 1):bandwidthHidx]))*1000
rmsDis = sqrt(sum(powDis[(int(bandwidthLidx) - 1):bandwidthHidx]))*1000000
return rmsAcc, rmsVel, rmsDis
}
metrics=<[rms(signalnose,N,sensitivity,gain,window, noverlap, nfft, fs,bandwidthL,bandwidthH) as `rmsAcc`rmsVel`rmsDis]>
go
try{dropAggregator(`tsAggr1)}catch(ex){}
go
try{dropAggregator(`tsAggr2)}catch(ex){}
try{unsubscribeTable(tableName="signal", actionName="act_tsAggr1")}catch(ex){}
go
try{unsubscribeTable(tableName="srms", actionName="act_saveDfs1")}catch(ex){}
go
try{unsubscribeTable(tableName="srms", actionName="act_tsAggr2")}catch(ex){}
go
try{unsubscribeTable(tableName="warn", actionName="act_saveDfs2")}catch(ex){}
go
try{dropStreamTable("signal")}catch(ex){}
go
try{dropStreamTable("srms")}catch(ex){}
go
try{dropStreamTable("warn")}catch(ex){}
go
//输入表
t = streamTable(100:0, `timestamp`source`signalnose,[TIMESTAMP,SYMBOL,DOUBLE])
enableTableShareAndPersistence(table=t, tableName=`signal, cacheSize = 2000000)
go
//输出表
share streamTable(100:0, `datetime`source`rmsAcc`rmsVel`rmsDis,[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE]) as srms
//定义时序聚合引擎
tsAggr1 = createTimeSeriesAggregator(name="tsAggr1", windowSize=2*60*1000, step=2*60*1000, metrics=metrics, dummyTable=signal, outputTable=srms, timeColumn=`timestamp, keyColumn=`source)
subscribeTable(tableName="signal", actionName="act_tsAggr1", offset=0, handler=append!{tsAggr1}, msgAsTable=true);
go
//报警
share streamTable(100:0, `datetime`source`type`metric,[TIMESTAMP,SYMBOL,INT,STRING]) as warn
tsAggr2 = createAnomalyDetectionEngine(name="tsAggr2", metrics=<[rmsAcc > 0.055, rmsVel >0.32, rmsDis > 34.5]>, dummyTable=srms, outputTable=warn, timeColumn=`datetime, keyColumn=`source, windowSize=2*60*1000, step=2*60*1000)
subscribeTable(tableName="srms", actionName="act_tsAggr2", offset=0, handler=append!{tsAggr2}, msgAsTable=true);
go
//创建RMS分布式表,并订阅了落库
if(existsDatabase("dfs://rmsDB")){
dropDatabase("dfs://rmsDB")
}
db = database("dfs://rmsDB", VALUE, 2022.01.01..2022.12.31)
m = table(1:0,`datetime`source`rmsAcc`rmsVel`rmsDis,[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE])
db.createPartitionedTable(m, "rms", ["datetime"])
pt_rms = loadTable("dfs://rmsDB", "rms")
def saveSrmsToDFS(mutable dfsRMS, msg){
dfsRMS.append!(select datetime, source, rmsAcc as rmsAcc, rmsVel as rmsVel, rmsDis as rmsDis from msg)
}
subscribeTable(tableName="srms", actionName="act_saveDfs1", offset=-1, handler=saveSrmsToDFS{pt_rms}, msgAsTable=true, batchSize=10000, throttle=1);
//创建warnrms分布式表,并订阅了落库
if(existsDatabase("dfs://warnDB")){
dropDatabase("dfs://warnDB")
}
db = database("dfs://warnDB", VALUE, 2022.01.01..2022.12.31)
m = table(1:0,`datetime`source`type`metric,[TIMESTAMP,SYMBOL,INT,STRING])
db.createPartitionedTable(m, "warnrms", ["datetime"])
pt_warn = loadTable("dfs://warnDB", "warnrms")
def saveWarnToDFS(mutable dfswarnrms, msg){
dfswarnrms.append!(select datetime, source, type, metric from msg)
}
subscribeTable(tableName="warn", actionName="act_saveDfs2", offset=-1, handler=saveWarnToDFS{pt_warn}, msgAsTable=true, batchSize=10000, throttle=1);
//===============================================
//模拟实时生产数据
def fakedata(t){
source=`channel+string(1..16)
num=source.shape()[0]
n=1000*60*10
timestamp=now()
print(timestamp)
for (i in 1..n){
timestamp = timestamp+1
// print(timestamp)
time1=take(timestamp,num)
flag = rand([-1, 1], num)
signalnose =rand(1.,num)
signalnose = signalnose*flag
Table = table(time1 as timestamp, source, signalnose).sortBy!(`source`timestamp)
tableInsert(t, Table)
}
}
submitJob("fakedataplay","fakedataplayjob",fakedata,t)
getRecentJobs(1)
// 实际数据导入
def realdata(t)
{
dataPath="/hdd/hdd1/xyl/ShagnhaiWulianwang/TD06-29B000.csv"
Schema=extractTextSchema(dataPath)
// update Schema set type=`DOUBLE where name = `col16
data=loadText(dataPath,schema=Schema)
data = transpose(data)
deviceNum = 2;
n = 1000*60*10; // 10分钟
source=`channel0`channel1
tmp = table(100:0, `timestamp`source`signalnose,[TIMESTAMP,SYMBOL,DOUBLE])
timestamp = now();
for(i in 1..n){
timestamp=timestamp+1
time1=take(timestamp,deviceNum)
flag = rand([-1, 1], deviceNum);
signalnose = take([data[`col0][i], data[`col1][i]], deviceNum)
signalnose = signalnose*flag
Table = table(time1 as timestamp, source, signalnose).sortBy!(`source`timestamp)
tableInsert(t, Table)
}
}
submitJob("realdataplay","realdataplay",realdata,t)
cancelJob("fakedataplay202301300002")
getRecentJobs(1)
//grafana展示:psd谱 -> grafana
try{undef(`psd, SHARED)}catch(ex){}
data = select * from signal where source == "channel1" and timestamp >= 2023.01.29 03:54:21.652 and timestamp <= 2023.01.29 03:56:21.652 //通道1的振动信号数据
nose_ = data[`signalnose]
temp_= nose_/sensitivity/gain * 9.8
temp_=temp_-mean(temp_)
res_=pwelch(temp_, window, noverlap, nfft, fs)
share table(res_[1] as f, res_[0] as psdvalue) as psd //共享内存表供grafana查看
select * from srms where source = `channel1
select * from warn where source = `channel1