Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions bumble/sdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import asyncio
import logging
import struct
import threading
from collections.abc import Iterable, Sequence
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, ClassVar, NewType, TypeVar
Expand All @@ -44,6 +45,13 @@
logger = logging.getLogger(__name__)


# SDP data elements are nested (SEQUENCE, ALTERNATIVE). Cap parse recursion to
# prevent a malicious peer from crashing the process via a deeply nested PDU.
# 32 levels is well beyond anything a legitimate service record uses.
_MAX_DATA_ELEMENT_NESTING = 32
_parse_state = threading.local()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not prefer to use a global threading local, but we can leave it here and refactor later.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — leaving the module-level threading.local() as-is per your suggestion; happy to refactor it into a context-scoped helper in a follow-up PR if/when you'd like.



# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -284,12 +292,22 @@ def signed_integer_from_bytes(data):

@staticmethod
def list_from_bytes(data):
elements = []
while data:
element = DataElement.from_bytes(data)
elements.append(element)
data = data[len(bytes(element)) :]
return elements
depth = getattr(_parse_state, "depth", 0)
if depth >= _MAX_DATA_ELEMENT_NESTING:
raise InvalidPacketError(
f"SDP data element nesting exceeds max depth "
f"({_MAX_DATA_ELEMENT_NESTING})"
)
_parse_state.depth = depth + 1
try:
elements = []
while data:
element = DataElement.from_bytes(data)
elements.append(element)
data = data[len(bytes(element)) :]
return elements
finally:
_parse_state.depth = depth

@staticmethod
def parse_from_bytes(data, offset):
Expand Down
40 changes: 40 additions & 0 deletions tests/sdp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,43 @@ async def run():
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())


# -----------------------------------------------------------------------------
def test_nested_sequence_recursion_guard():
"""Regression test: deeply-nested SDP SEQUENCE/ALTERNATIVE must not crash
the parser with RecursionError. Instead a ValueError is raised once the
configured nesting limit is exceeded.

Root cause: DataElement.from_bytes -> list_from_bytes -> (constructor
dispatching back to list_from_bytes for SEQUENCE/ALTERNATIVE) recursed
without a depth limit. A malicious SDP peer could craft a PDU exceeding
Pythons default recursion limit (~1000 frames) and crash the host.
"""
# Build nested SEQUENCE payload with tag 0x36 (SEQUENCE, 2-byte length).
inner = b"\x35\x00" # empty SEQUENCE terminator
for _ in range(1500):
size = len(inner)
if size >= 65535:
break
inner = bytes([0x36, (size >> 8) & 0xFF, size & 0xFF]) + inner

with pytest.raises(ValueError, match="nesting exceeds max depth"):
DataElement.from_bytes(inner)


def test_nested_sequence_within_limit_still_works():
"""Nested-but-reasonable SDP SEQUENCEs must still parse correctly."""
leaf = DataElement.unsigned_integer(1, value_size=2)
payload = leaf
for _ in range(16): # under the 32-depth limit
payload = DataElement.sequence([payload])
raw = bytes(payload)
parsed = DataElement.from_bytes(raw)
# Walk back down to confirm structural integrity preserved
cur = parsed
for _ in range(16):
assert cur.type == DataElement.SEQUENCE
cur = cur.value[0]
assert cur.type == DataElement.UNSIGNED_INTEGER
assert cur.value == 1
Loading