-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathdeviceState.txt
More file actions
97 lines (75 loc) · 3.07 KB
/
deviceState.txt
File metadata and controls
97 lines (75 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
login(`admin,`123456)
try{loadPlugin(getHomeDir()+"/plugins/mqtt/PluginMQTTClient.txt")} catch(ex) {print(ex)}
go
use mqtt;
def clearEnv(){
try{
unsubscribeTable( tableName=`inputSt, actionName="monitor")
if(objs(true).name.find('inputSt')!=-1)
dropStreamTable(`inputSt)
if(objs(true).name.find('outputSt1')!=-1)
dropStreamTable(`outputSt1)
if(objs(true).name.find('outputSt2')!=-1)
dropStreamTable(`outputSt2)
if (getAggregatorStat().ReactiveStreamEngine[`name].find(`reactivEngine)!=-1)
dropAggregator(`reactivEngine)
if (getAggregatorStat().SessionWindowEngine[`name].find(`swEngine)!=-1)
dropAggregator(`swEngine)
for (id in mqtt::getSubscriberStat()[`subscriptionId])
mqtt::unsubscribe(id)
}catch(ex){
print(ex)
}
}
def createInOutTable(){
stream01=streamTable(100000:0,`tag`ts`value,[SYMBOL,TIMESTAMP, INT])
enableTableShareAndPersistence(table=stream01,tableName=`inputSt,asynWrite=false,compress=true, cacheSize=100000)
out1 =streamTable(10000:0,`tag`ts`value,[SYMBOL,TIMESTAMP, INT])
enableTableShareAndPersistence(table=out1,tableName=`outputSt1,asynWrite=false,compress=true, cacheSize=100000)
out2 =streamTable(10000:0,`ts`tag`lastValue,[TIMESTAMP,SYMBOL, INT])
enableTableShareAndPersistence(table=out2,tableName=`outputSt2,asynWrite=false,compress=true, cacheSize=100000)
}
def process(mutable engine1,mutable engine2,msg){
engine1.append!(msg)
engine2.append!(msg)
}
def consume(){
reactivEngine = createReactiveStateEngine(name=`reactivEngine, metrics=<[ts, value]>, dummyTable=objByName(`inputSt),outputTable= objByName(`outputSt1),keyColumn= "tag",filter=<value!=prev(value) && prev(value)!=NULL>)
swEngine = createSessionWindowEngine(name = "swEngine", sessionGap = 30000, metrics = < last(value)>, dummyTable = objByName(`inputSt), outputTable = objByName(`outputSt2), timeColumn = `ts, keyColumn=`tag,useSessionStartTime=false)
subscribeTable(tableName="inputSt", actionName="monitor", offset=0,
handler=process{swEngine,reactivEngine}, msgAsTable=true)
}
def writeStreamTable(host, port, topic){
//sp = mqtt::createCsvParser([SYMBOL,TIMESTAMP,INT])
sp = createJsonParser([SYMBOL,TIMESTAMP, INT], `tag`ts`value)
mqtt::subscribe(host, port, topic, sp, objByName(`inputSt))
}
def publishTableData(server,topic,f, batchsize,t){
conn=connect(server,1883,0,f,batchsize)
publish(conn,topic,t)
close(conn)
}
def main(){
clearEnv()
createInOutTable()
consume()
host="127.0.0.1"
port=1883
topic="sensor/test"
writeStreamTable(host, port, topic)
t=loadText(getHomeDir()+"/deviceState.csv")
//myFormat=take("", 3) f= mqtt::createCsvFormatter(myFormat, ',', ';')
f = createJsonFormatter()
batchsize=100
submitJob("submit_pub1", "submit_p1", publishTableData{host,topic,f, batchsize,t})
}
main()
getAggregatorStat().ReactiveStreamEngine
getAggregatorStat().SessionWindowEngine
getStreamingStat().pubTables
getStreamingStat().pubConns
getStreamingStat().subWorkers
mqtt::getSubscriberStat()
getRecentJobs()
select * from outputSt1 where tag=`motor.C17156B.m1
select * from outputSt2 where tag=`motor.C17156B.m1