Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions config/nmqtt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ allow_anonymous = true
#
#password_file =

#
# The absolute path to the ACL (Access Control List) file. This uses
# Mosquitto-compatible ACL format to restrict which users can publish
# and subscribe to which topics.
#
# The ACL file supports:
# user <username> - Start a user-specific rule section
# topic [read|write|readwrite] <topic> - Allow access to a topic
# pattern [read|write|readwrite] <topic> - Pattern rules for all users
#
# Pattern topics support %u (username) and %c (client ID) substitution.
# MQTT wildcards (# and +) are supported in topic filters.
#
# If no ACL file is specified, all authenticated users have full access.
#
#acl_file =

#
# To activate a SSL connection you need a certificate and a private key.
# When you specify both of them, the connection will automatic be using
Expand Down
102 changes: 72 additions & 30 deletions nmqtt.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ when defined(broker):
times,
random,
nmqtt/utils/passwords,
nmqtt/utils/version
nmqtt/utils/version,
nmqtt/utils/acl,
nmqtt/utils/trie
from parsecfg import loadConfig, getSectionValue
from os import fileExists

Expand Down Expand Up @@ -142,6 +144,7 @@ when defined(broker):
connections: Table[string, MqttCtx]
retained: Table[string, RetainedMsg] # Topic, RetaindMsg
subscribers: Table[string, seq[MqttCtx]]
subTrie: TopicTrie # Trie index for O(depth) subscription matching
version: uint8
clientIdMaxLen: int
clientKickOld: bool
Expand All @@ -150,6 +153,7 @@ when defined(broker):
passClientId: bool
maxConnections: int
passwords: Table[string, string]
acl: AclStore

RetainedMsg = object
msg: string
Expand All @@ -159,7 +163,7 @@ when defined(broker):

when defined(broker):
var
mqttbroker = MqttBroker()
mqttbroker = MqttBroker(acl: newAclStore(), subTrie: newTopicTrie())
r = initRand(toInt(epochTime()))


Expand Down Expand Up @@ -289,6 +293,8 @@ when defined(broker):
mqttbroker.subscribers[topic].insert(ctx)
else:
mqttbroker.subscribers[topic] = @[ctx]
# First subscriber on this filter, add to the trie index
mqttbroker.subTrie.subscribe(topic)
except:
wrn("Crash when adding a new subcriber")

Expand All @@ -298,6 +304,9 @@ when defined(broker):
try:
if mqttbroker.subscribers.hasKey(topic):
mqttbroker.subscribers[topic] = filter(mqttbroker.subscribers[topic], proc(x: MqttCtx): bool = x != ctx)
if mqttbroker.subscribers[topic].len() == 0:
mqttbroker.subscribers.del(topic)
mqttbroker.subTrie.unsubscribe(topic)
except:
wrn("Crash when removing subscriber with specific topic")

Expand All @@ -314,6 +323,7 @@ when defined(broker):

for t in delTop:
mqttbroker.subscribers.del(t)
mqttbroker.subTrie.unsubscribe(t)

when defined(broker):
proc qosAlign(qP, qS: uint8): uint8 =
Expand Down Expand Up @@ -641,20 +651,22 @@ when defined(broker):
await c.work()

when defined(broker):
proc publishToSubscribers(seqctx: seq[MqttCtx], pkt: Pkt, topic, message: string, qos: uint8, retain: bool, senderId: string) {.async.} =
## Publish async to clients
proc publishToSubscribers(seqctx: seq[MqttCtx], pkt: Pkt, subFilter, pubTopic, message: string, qos: uint8, retain: bool, senderId: string) {.async.} =
## Publish async to clients.
## `subFilter` is the subscription key (e.g. "topic/subtopic/#") for QoS lookup.
## `pubTopic` is the actual published topic (e.g. "topic/subtopic/specific") sent to the client.
for c in seqctx:
if c.state != Connected:
asyncCheck removeSubscriber(c, topic)
asyncCheck removeSubscriber(c, subFilter)
continue
let
msgId = c.nextMsgId()
qosSub = qosAlign(qos, c.subscribed[topic])
qosSub = qosAlign(qos, c.subscribed[subFilter])

if mqttbroker.passClientId:
c.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, topic: topic, qos: qosSub, retain: retain, message: senderId & ":" & message, typ: Publish)
c.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, topic: pubTopic, qos: qosSub, retain: retain, message: senderId & ":" & message, typ: Publish)
else:
c.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, topic: topic, qos: qosSub, retain: retain, message: message, typ: Publish)
c.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, topic: pubTopic, qos: qosSub, retain: retain, message: message, typ: Publish)
await c.work()

when defined(broker):
Expand Down Expand Up @@ -810,12 +822,25 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} =
(message, offset) = pkt.getstring(offset, false)

when defined(broker):
# Send message to all subscribers on "#"
if mqttbroker.subscribers.hasKey("#"):
await publishToSubscribers(mqttbroker.subscribers["#"], pkt, "#", message, qos, retain, ctx.clientid)
# Send message to all subscribers on _the topic_
if mqttbroker.subscribers.hasKey(topic):
await publishToSubscribers(mqttbroker.subscribers[topic], pkt, topic, message, qos, retain, ctx.clientid)
# Check ACL: does the publishing client have write access?
if not mqttbroker.acl.checkPublish(ctx.username, ctx.clientId, topic):
if mqttbroker.verbosity >= 1:
verbose("ACL >> " & ctx.clientId & " denied publish to " & topic)
# Per MQTT v3.1.1, the broker silently drops the message but still
# completes the QoS handshake so the client doesn't stall.
if qos == 1:
ctx.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, state: WorkNew, qos: 1, typ: PubAck)
await ctx.work()
elif qos == 2:
ctx.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, state: WorkNew, qos: 2, typ: PubRec)
await ctx.work()
return

# Route the published message to all matching subscribers.
# Uses the trie index for O(topic_depth) matching instead of O(n) scan.
for subFilter in mqttbroker.subTrie.matchingFilters(topic):
if mqttbroker.subscribers.hasKey(subFilter):
await publishToSubscribers(mqttbroker.subscribers[subFilter], pkt, subFilter, topic, message, qos, retain, ctx.clientid)

if mqttbroker.verbosity >= 1:
verbose("Client >> " & ctx.clientId & " has published a message")
Expand Down Expand Up @@ -869,36 +894,44 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} =

proc onPubAck(ctx: MqttCtx, pkt: Pkt) {.async.} =
let (msgId, _) = pkt.getu16(0)
assert msgId in ctx.workQueue
assert ctx.workQueue[msgId].wk == PubWork
assert ctx.workQueue[msgId].state == WorkSent
assert ctx.workQueue[msgId].qos == 1
if msgId notin ctx.workQueue:
ctx.dmp "PubAck for unknown msgId: " & $msgId
return
if ctx.workQueue[msgId].wk != PubWork or ctx.workQueue[msgId].qos != 1:
ctx.dmp "PubAck unexpected state for msgId: " & $msgId
return
ctx.workQueue.del msgId

proc onPubRec(ctx: MqttCtx, pkt: Pkt) {.async.} =
let (msgId, _) = pkt.getu16(0)
assert msgId in ctx.workQueue
assert ctx.workQueue[msgId].wk == PubWork
assert ctx.workQueue[msgId].state == WorkSent
assert ctx.workQueue[msgId].qos == 2
if msgId notin ctx.workQueue:
ctx.dmp "PubRec for unknown msgId: " & $msgId
return
if ctx.workQueue[msgId].wk != PubWork or ctx.workQueue[msgId].qos != 2:
ctx.dmp "PubRec unexpected state for msgId: " & $msgId
return
ctx.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, state: WorkNew, qos: 2, typ: PubRel)
await ctx.work()

proc onPubRel(ctx: MqttCtx, pkt: Pkt) {.async.} =
let (msgId, _) = pkt.getu16(0)
assert msgId in ctx.workQueue
assert ctx.workQueue[msgId].wk == PubWork
assert ctx.workQueue[msgId].state == WorkSent
assert ctx.workQueue[msgId].qos == 2
if msgId notin ctx.workQueue:
ctx.dmp "PubRel for unknown msgId: " & $msgId
return
if ctx.workQueue[msgId].wk != PubWork or ctx.workQueue[msgId].qos != 2:
ctx.dmp "PubRel unexpected state for msgId: " & $msgId
return
ctx.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, state: WorkNew, qos: 2, typ: PubComp)
await ctx.work()

proc onPubComp(ctx: MqttCtx, pkt: Pkt) {.async.} =
let (msgId, _) = pkt.getu16(0)
assert msgId in ctx.workQueue
assert ctx.workQueue[msgId].wk == PubWork
assert ctx.workQueue[msgId].state == WorkSent
assert ctx.workQueue[msgId].qos == 2
if msgId notin ctx.workQueue:
ctx.dmp "PubComp for unknown msgId: " & $msgId
return
if ctx.workQueue[msgId].wk != PubWork or ctx.workQueue[msgId].qos != 2:
ctx.dmp "PubComp unexpected state for msgId: " & $msgId
return
ctx.workQueue.del msgId

#when defined(broker):
Expand All @@ -921,6 +954,15 @@ proc onSubscribe(ctx: MqttCtx, pkt: Pkt) {.async.} =
(topic, offset) = pkt.getstring(offset, parseInt($nextLen))
(qos, offset) = pkt.getu8(offset)

# Check ACL: does the client have read access for this topic?
if not mqttbroker.acl.checkSubscribe(ctx.username, ctx.clientId, topic):
if mqttbroker.verbosity >= 1:
verbose("ACL >> " & ctx.clientId & " denied subscribe to " & topic)
# Send SubAck with failure return code (0x80) per MQTT v3.1.1 spec
ctx.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, state: WorkNew, qos: 0, typ: SubAck)
await ctx.work()
return

ctx.subscribed[topic] = qos
await addSubscriber(ctx, topic)

Expand Down
23 changes: 22 additions & 1 deletion nmqtt/nmqtt.nim
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,12 @@ OPTIONS:
ClientID in payload: $11
Client kick old: $12
Number of passwords: $13
ACL loaded: $14

""".format(nmqttVersion, mb.host, mb.port, mb.sslOn, now(), mb.verbosity,
mb.maxConnections, mb.clientIdMaxLen, mb.spacesInClientId,
mb.emptyClientId, mb.passClientId, mb.clientKickOld, mb.passwords.len()
mb.emptyClientId, mb.passClientId, mb.clientKickOld, mb.passwords.len(),
mb.acl.loaded
)


Expand Down Expand Up @@ -194,6 +196,16 @@ proc loadConf(mb: MqttBroker, config: string) =
let passwordFile = dict.getSectionValue("","password_file")
loadPasswords(passwordFile)

# Load ACL file if specified
let aclFile = dict.getSectionValue("","acl_file")
if aclFile != "":
if not fileExists(aclFile):
echo "\nACL file does not exist..\n - " & aclFile
quit()
mqttbroker.acl.loadAclFile(aclFile)
if mqttbroker.verbosity >= 1:
echo "ACL loaded from: " & aclFile


proc handler() {.noconv.} =
## Catch ctrl+c from user
Expand All @@ -206,6 +218,7 @@ proc handler() {.noconv.} =
proc nmqttBroker(config="", host="127.0.0.1", port=1883, verbosity=0, max_conn=0,
clientid_maxlen=60, clientid_spaces=false, clientid_empty=false,
client_kickold=false, clientid_pass=false, password_file="",
acl_file="",
ssl=false, ssl_cert="", ssl_key=""
) {.async.} =
## CLI tool for a MQTT broker
Expand All @@ -226,6 +239,12 @@ proc nmqttBroker(config="", host="127.0.0.1", port=1883, verbosity=0, max_conn=0
if password_file != "":
loadPasswords(password_file)

if acl_file != "":
if not fileExists(acl_file):
echo "\nACL file does not exist..\n - " & acl_file
quit()
mqttbroker.acl.loadAclFile(acl_file)

if ssl:
mqttbroker.sslOn = true
mqttbroker.sslCert = ssl_cert
Expand Down Expand Up @@ -282,13 +301,15 @@ $options
"client-kickold": "kick old client, if new client has same clientid. Defaults to false.",
"clientid-pass": "pass clientid in payload {clientid:payload}. Defaults to false.",
"password-file": "absolute path to the password file",
"acl-file": "absolute path to the ACL file (Mosquitto-compatible format)",
"ssl": "activate ssl for the broker - requires --ssl-cert and --ssl-key.",
"ssl-cert": "absolute path to the ssl certificate.",
"ssl-key": "absolute path to the ssl key."
},
short={
"help": '?',
"max-conn": '\0',
"acl-file": '\0',
"ssl": '\0',
"ssl-cert": '\0',
"ssl-key": '\0'
Expand Down
Loading