-
Notifications
You must be signed in to change notification settings - Fork 28
Expand file tree
/
Copy pathclient.go
More file actions
181 lines (153 loc) · 4.77 KB
/
client.go
File metadata and controls
181 lines (153 loc) · 4.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package topic
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"strings"
"time"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
var (
client MQTT.Client
connected bool = false
config *MetricSet
reporter mb.ReporterV2
events chan mb.Event
)
func NewTLSConfig() *tls.Config {
// Import trusted certificates from CAfile.pem.
// Alternatively, manually add CA certificates to
// default openssl CA bundle.
certpool := x509.NewCertPool()
if config.CA != "" {
logp.Info("[MQTT] Set the CA")
pemCerts, err := ioutil.ReadFile(config.CA)
if err == nil {
certpool.AppendCertsFromPEM(pemCerts)
}
}
tlsconfig := &tls.Config{
// RootCAs = certs used to verify server cert.
RootCAs: certpool,
// ClientAuth = whether to request cert from server.
// Since the server is set up for SSL, this happens
// anyways.
ClientAuth: tls.NoClientCert,
// ClientCAs = certs used to validate client cert.
ClientCAs: nil,
// InsecureSkipVerify = verify that cert contents
// match server. IP matches what is in cert etc.
InsecureSkipVerify: true,
}
// Import client certificate/key pair
if config.ClientCert != "" && config.ClientKey != "" {
logp.Info("[MQTT] Set the Certs")
cert, err := tls.LoadX509KeyPair(config.ClientCert, config.ClientKey)
if err != nil {
panic(err)
}
// Certificates = list of certs client sends to server.
tlsconfig.Certificates = []tls.Certificate{cert}
}
// Create tls.Config with desired tls properties
return tlsconfig
}
// Prepare MQTT client
func setupMqttClient(m *MetricSet) {
logp.Info("[MQTT] Connect to broker URL: %s", m.BrokerURL)
config = m
mqttClientOpt := MQTT.NewClientOptions()
mqttClientOpt.SetClientID(m.ClientID)
mqttClientOpt.AddBroker(m.BrokerURL)
mqttClientOpt.SetMaxReconnectInterval(1 * time.Second)
mqttClientOpt.SetConnectionLostHandler(reConnectHandler)
mqttClientOpt.SetOnConnectHandler(subscribeOnConnect)
mqttClientOpt.SetAutoReconnect(true)
if m.BrokerUsername != "" {
logp.Info("[MQTT] Broker username: %s", m.BrokerUsername)
mqttClientOpt.SetUsername(m.BrokerUsername)
}
if m.BrokerPassword != "" {
mqttClientOpt.SetPassword(m.BrokerPassword)
}
if m.SSL == true {
logp.Info("[MQTT] Configure session to use SSL")
tlsconfig := NewTLSConfig()
mqttClientOpt.SetTLSConfig(tlsconfig)
}
client = MQTT.NewClient(mqttClientOpt)
connect(client)
}
func connect(client MQTT.Client) {
if !connected {
if token := client.Connect(); token.Wait() && token.Error() != nil {
logp.Info("Failed to connect to broker, waiting 5 seconds and retrying")
time.Sleep(5 * time.Second)
connected = false
reConnectHandler(client, token.Error())
return
}
connected = client.IsConnected()
logp.Info("MQTT Client connected: %t", client.IsConnected())
return
}
}
func subscribeOnConnect(client MQTT.Client) {
subscriptions := ParseTopics(config.TopicsSubscribe, config.QoS)
//bt.beatConfig.TopicsSubscribe
// Mqtt client - Subscribe to every topic in the config file, and bind with message handler
if token := client.SubscribeMultiple(subscriptions, onMessage); token.Wait() && token.Error() != nil {
panic(token.Error())
}
events = make(chan mb.Event, 500)
logp.Info("Subscribed to configured topics")
}
// Mqtt message handler
func onMessage(client MQTT.Client, msg MQTT.Message) {
logp.Debug("MQTT", "MQTT message received: %s", string(msg.Payload()))
var mbEvent mb.Event
event := make(mapstr.M)
root := make(mapstr.M)
if config.LegacyFields {
var message = make(mapstr.M)
message["content"] = string(msg.Payload())
if strings.HasPrefix(msg.Topic(), "$") {
event["isSystemTopic"] = true
} else {
event["isSystemTopic"] = false
}
event["topic"] = msg.Topic()
message["ID"] = msg.MessageID()
message["retained"] = msg.Retained()
event["message"] = message
}
if config.ECSFields {
root.Put("event.creation", time.Now())
root.Put("event.dataset", msg.Topic())
root.Put("message", string(msg.Payload()))
}
// Finally sending the message to elasticsearch
mbEvent.RootFields = root
mbEvent.ModuleFields = event
events <- mbEvent
logp.Debug("MQTT", "Event sent: %t", true)
}
// DefaultConnectionLostHandler does nothing
func reConnectHandler(client MQTT.Client, reason error) {
logp.Warn("[MQTT] Connection lost: %s", reason.Error())
connected = false
connect(client)
}
// ParseTopics will parse the config file and return a map with topic:QoS
func ParseTopics(topics []string, qos int) map[string]byte {
subscriptions := make(map[string]byte)
for _, value := range topics {
// Finally, filling the subscriptions map
subscriptions[value] = byte(qos)
logp.Info("Subscribe to %v with QoS %v", value, qos)
}
return subscriptions
}