Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions e2e/helpers/mock_rtsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
sdp_control: str = "*",
content_base: str | None = "auto",
custom_sdp: str | None = None,
options_session_id: str | None = None,
host: str = "127.0.0.1",
):
"""
Expand All @@ -40,13 +41,17 @@ def __init__(
for relative controls); ``None`` omits the header entirely;
any other string is sent verbatim.
custom_sdp: If set, replaces the auto-generated SDP body.
options_session_id: If set, OPTIONS responds with this Session ID
and every subsequent request (DESCRIBE, SETUP, PLAY, ...)
must echo it (simulates HMS-style servers).
host: Address to listen on (use "::1" for IPv6 loopback).
"""
self.host = host
self.port = port or find_free_port(host)
self._sdp_control = sdp_control
self._content_base = content_base
self._custom_sdp = custom_sdp
self._options_session_id = options_session_id
self._server_sock: socket.socket | None = None
self._thread: threading.Thread | None = None
self._stop = threading.Event()
Expand Down Expand Up @@ -82,6 +87,10 @@ def _after_play(self, conn: socket.socket, addr: tuple) -> None:
"""Called right after the PLAY 200 OK is sent. Pump data here."""
raise NotImplementedError

def _session_id(self) -> str:
"""Session ID returned by OPTIONS/SETUP/PLAY (HMS uses OPTIONS session)."""
return self._options_session_id or "t1"

# -- internals -----------------------------------------------------------

def _accept(self) -> None:
Expand Down Expand Up @@ -131,11 +140,24 @@ def _handle(self, conn: socket.socket, addr: tuple) -> None:
}
)

# HMS-style servers assign the session at OPTIONS time and
# close the connection if any later request doesn't echo it.
if (
self._options_session_id
and method != "OPTIONS"
and req_headers_map.get("Session") != self._options_session_id
):
return

if method == "OPTIONS":
session_line = ""
if self._options_session_id:
session_line = "Session: %s\r\n" % self._options_session_id
conn.sendall(
(
"RTSP/1.0 200 OK\r\nCSeq: %s\r\n"
"Public: OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN\r\n\r\n" % cseq
"%s"
"Public: OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN\r\n\r\n" % (cseq, session_line)
).encode()
)
elif method == "DESCRIBE":
Expand Down Expand Up @@ -172,11 +194,15 @@ def _handle(self, conn: socket.socket, addr: tuple) -> None:
elif method == "SETUP":
conn.sendall(self._setup_response(cseq, transport_hdr).encode())
elif method == "PLAY":
conn.sendall(("RTSP/1.0 200 OK\r\nCSeq: %s\r\nSession: t1\r\n\r\n" % cseq).encode())
conn.sendall(
("RTSP/1.0 200 OK\r\nCSeq: %s\r\nSession: %s\r\n\r\n" % (cseq, self._session_id())).encode()
)
self._after_play(conn, addr)
return
elif method == "TEARDOWN":
conn.sendall(("RTSP/1.0 200 OK\r\nCSeq: %s\r\nSession: t1\r\n\r\n" % cseq).encode())
conn.sendall(
("RTSP/1.0 200 OK\r\nCSeq: %s\r\nSession: %s\r\n\r\n" % (cseq, self._session_id())).encode()
)
return
except socket.timeout, ConnectionError, OSError:
pass
Expand Down Expand Up @@ -206,16 +232,24 @@ def __init__(
sdp_control: str = "*",
content_base: str | None = "auto",
custom_sdp: str | None = None,
options_session_id: str | None = None,
host: str = "127.0.0.1",
):
super().__init__(port, sdp_control=sdp_control, content_base=content_base, custom_sdp=custom_sdp, host=host)
super().__init__(
port,
sdp_control=sdp_control,
content_base=content_base,
custom_sdp=custom_sdp,
options_session_id=options_session_id,
host=host,
)
self._num_packets = num_packets

def _setup_response(self, cseq: str, transport_hdr: str) -> str:
return (
"RTSP/1.0 200 OK\r\nCSeq: %s\r\n"
"Transport: RTP/AVP/TCP;unicast;interleaved=0-1\r\n"
"Session: t1\r\n\r\n" % cseq
"Session: %s\r\n\r\n" % (cseq, self._session_id())
)

def _after_play(self, conn: socket.socket, addr: tuple) -> None:
Expand Down
24 changes: 24 additions & 0 deletions e2e/test_rtsp_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,30 @@ def test_tcp_protocol_handshake(self, shared_r2h):
finally:
rtsp.stop()

def test_requests_echo_options_session(self, shared_r2h):
"""HMS-style servers return Session in OPTIONS; all subsequent
requests (DESCRIBE, SETUP, PLAY) must echo it."""
session_id = "2728486233"
rtsp = MockRTSPServer(num_packets=200, options_session_id=session_id)
rtsp.start()
try:
status, _, body = stream_get(
"127.0.0.1",
shared_r2h.port,
"/rtsp/127.0.0.1:%d/stream" % rtsp.port,
read_bytes=4096,
timeout=_STREAM_TIMEOUT,
)
assert status == 200, "Expected stream to succeed when requests carry Session"
assert len(body) > 0

assert rtsp.requests_received.index("OPTIONS") < rtsp.requests_received.index("DESCRIBE")
for method in ("DESCRIBE", "SETUP", "PLAY"):
req = next(r for r in rtsp.requests_detailed if r["method"] == method)
assert req["headers"].get("Session") == session_id, "%s must echo the OPTIONS Session" % method
finally:
rtsp.stop()


# ===================================================================
# UDP transport
Expand Down
48 changes: 26 additions & 22 deletions src/rtsp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1126,15 +1126,27 @@ static int rtsp_prepare_request(rtsp_session_t *session, const char *method, con
}
}

/* RFC 2326 section 12.37: once a session has been established, every
* request within that session must carry the Session header. Some servers
* (e.g. Huawei HMS edge nodes) assign the session ID as early as the
* OPTIONS response and close the connection if subsequent requests
* (DESCRIBE, SETUP, ...) do not echo it, so inject it centrally here. */
char session_header[RTSP_SESSION_ID_SIZE + 16] = "";
if (session->session_id[0] != '\0') {
snprintf(session_header, sizeof(session_header), "Session: %s\r\n", session->session_id);
}

/* Build RTSP request */
int len = snprintf(session->pending_request, sizeof(session->pending_request),
"%s %s %s\r\n"
"CSeq: %u\r\n"
"User-Agent: %s\r\n"
"%s"
"%s"
"%s"
"\r\n",
method, request_url, RTSP_VERSION, session->cseq++, rtsp_get_user_agent(), auth_header, extra);
method, request_url, RTSP_VERSION, session->cseq++, rtsp_get_user_agent(), session_header,
auth_header, extra);

if (len < 0 || len >= (int)sizeof(session->pending_request)) {
logger(LOG_ERROR, "RTSP: Request buffer overflow");
Expand Down Expand Up @@ -1166,16 +1178,10 @@ int rtsp_send_keepalive(rtsp_session_t *session) {
return 1; /* Busy with another request */
}

char extra_headers[RTSP_HEADERS_BUFFER_SIZE];
if (snprintf(extra_headers, sizeof(extra_headers), "Session: %s\r\n", session->session_id) >=
(int)sizeof(extra_headers)) {
logger(LOG_ERROR, "RTSP: Failed to format keepalive headers");
return -1;
}

/* Use GET_PARAMETER if supported, otherwise use OPTIONS */
/* Use GET_PARAMETER if supported, otherwise use OPTIONS
* (Session header is injected automatically by rtsp_prepare_request) */
const char *method = session->use_get_parameter ? RTSP_METHOD_GET_PARAMETER : RTSP_METHOD_OPTIONS;
if (rtsp_prepare_request(session, method, NULL, extra_headers) < 0) {
if (rtsp_prepare_request(session, method, NULL, NULL) < 0) {
logger(LOG_ERROR, "RTSP: Failed to prepare %s keepalive request", method);
return -1;
}
Expand Down Expand Up @@ -1589,13 +1595,11 @@ int rtsp_state_machine_advance(rtsp_session_t *session) {

case RTSP_STATE_SETUP:
if (session->use_playseek_range && session->playseek_range_start[0] != '\0') {
snprintf(extra_headers, sizeof(extra_headers), "Session: %s\r\nRange: clock=%s-\r\n", session->session_id,
session->playseek_range_start);
snprintf(extra_headers, sizeof(extra_headers), "Range: clock=%s-\r\n", session->playseek_range_start);
} else if (session->r2h_start[0] != '\0') {
snprintf(extra_headers, sizeof(extra_headers), "Session: %s\r\nRange: npt=%s-\r\n", session->session_id,
session->r2h_start);
snprintf(extra_headers, sizeof(extra_headers), "Range: npt=%s-\r\n", session->r2h_start);
} else {
snprintf(extra_headers, sizeof(extra_headers), "Session: %s\r\n", session->session_id);
extra_headers[0] = '\0';
}
if (rtsp_prepare_request(session, RTSP_METHOD_PLAY, NULL, extra_headers) < 0) {
logger(LOG_ERROR, "RTSP: Failed to prepare PLAY request");
Expand All @@ -1615,8 +1619,7 @@ int rtsp_state_machine_advance(rtsp_session_t *session) {
case RTSP_STATE_RECONNECTING:
/* Reconnection completed, now send TEARDOWN */
if (session->teardown_requested) {
snprintf(extra_headers, sizeof(extra_headers), "Session: %s\r\n", session->session_id);
if (rtsp_prepare_request(session, RTSP_METHOD_TEARDOWN, NULL, extra_headers) < 0) {
if (rtsp_prepare_request(session, RTSP_METHOD_TEARDOWN, NULL, NULL) < 0) {
logger(LOG_ERROR, "RTSP: Failed to prepare TEARDOWN after reconnect");
return -1;
}
Expand Down Expand Up @@ -2060,8 +2063,6 @@ static int rtsp_reconnect_for_teardown(rtsp_session_t *session) {
* Returns: 0 if TEARDOWN initiated, 1 if reconnect needed, -1 on error
*/
static int rtsp_initiate_teardown(rtsp_session_t *session) {
char extra_headers[RTSP_HEADERS_BUFFER_SIZE];

/* Check if socket is still valid */
if (session->socket >= 0) {
int sock_error = 0;
Expand All @@ -2079,9 +2080,7 @@ static int rtsp_initiate_teardown(rtsp_session_t *session) {
session->awaiting_response = 0;
session->response_buffer_pos = 0;

snprintf(extra_headers, sizeof(extra_headers), "Session: %s\r\n", session->session_id);

if (rtsp_prepare_request(session, RTSP_METHOD_TEARDOWN, NULL, extra_headers) < 0) {
if (rtsp_prepare_request(session, RTSP_METHOD_TEARDOWN, NULL, NULL) < 0) {
logger(LOG_ERROR, "RTSP: Failed to prepare TEARDOWN request");
return -1;
}
Expand Down Expand Up @@ -2936,6 +2935,11 @@ static int rtsp_handle_redirect(rtsp_session_t *session, const char *location) {

session->redirect_count++;

/* The session ID is scoped to the server that issued it. Drop any
* session learned from the previous server so requests to the
* redirected-to server don't carry a stale Session header. */
session->session_id[0] = '\0';

/* Close current connection and remove from poller properly */
if (session->socket >= 0) {
worker_cleanup_socket_from_epoll(session->epoll_fd, session->socket);
Expand Down
Loading