-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathalarm.txt
More file actions
50 lines (41 loc) · 2.12 KB
/
alarm.txt
File metadata and controls
50 lines (41 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
login("admin", "123456")
//输入流数据表
st = streamTable(1000000:0, `deviceID`ts`temperature, [INT,DATETIME,FLOAT])
enableTableShareAndPersistence(st, `sensor, false, true, 1000000)
//创建报警表
share streamTable(1000:0, `time`deviceID`anomalyType`anomalyString, [DATETIME,INT,INT, SYMBOL]) as warningTable
//创建异常检测引擎,2分钟内,温度传感器出现2次40度以上,一次30度以上,报警
engine = createAnomalyDetectionEngine(name="engine1", metrics=<[sum(temperature > 40) > 2 && sum(temperature > 30) > 3 ]>, dummyTable=sensor, outputTable=warningTable, timeColumn=`ts,keyColumn=`deviceID ,windowSize= 120,step=30)
subscribeTable(tableName="sensor", actionName="sensorAnomalyDetection", offset=0, handler= append!{engine}, msgAsTable=true)
//5分钟无数据,报警
t=keyedTable(`deviceID,100:0, `deviceID`time, [INT,DATETIME])
deviceNum = 3
insert into t values(1..deviceNum, take(now().datetime(), deviceNum))
def checkNoData (mutable keyedTable, mutable outputTable, msg) {
keyedTable.append!(select deviceID, ts from msg)
warning = select now().datetime(), deviceID, 1 as anomalyType, "" as anomalyString from keyedTable where time < datetimeAdd(now().datetime(), -5, "m")
outputTable.append!(warning)
}
subscribeTable(tableName="sensor", actionName="noData", offset=0, handler=checkNoData{t, warningTable}, msgAsTable=true, batchSize=1000000, throttle=1)
//模拟写入流表
def writeData(){
deviceNum = 3
for (i in 0:60) {
data = table(take(1..deviceNum,deviceNum) as deviceID ,take(now().datetime(),deviceNum) as ts,rand(10..41,deviceNum) as temperature)
sensor.append!(data)
sleep(1000)
}
deviceNum = 2
for (i in 0:600) {
data = table(take(1..deviceNum,deviceNum) as deviceID ,take(now().datetime(),deviceNum) as ts,rand(10..45,deviceNum) as temperature)
sensor.append!(data)
sleep(1000)
}
}
submitJob("simulateData", "simulate sensor data", writeData)
//取消流表和订阅
dropAggregator("engine1")
unsubscribeTable(, "sensor", "sensorAnomalyDetection")
unsubscribeTable(, "sensor", "noData")
dropStreamTable(`sensor)
undef(`st, VAR)