22
33from __future__ import annotations
44
5- import asyncio
65from typing import Any
76
87import anyio
98import pytest
109
1110from mcp import types
1211from mcp .client .session import ClientSession
13- from mcp .server .lowlevel .server import Server , request_ctx
14- from mcp .shared .context import RequestContext
1512from mcp .server .events import RetainedValueStore , SubscriptionRegistry
1613from mcp .server .lowlevel import NotificationOptions
14+ from mcp .server .lowlevel .server import Server , request_ctx
1715from mcp .server .models import InitializationOptions
1816from mcp .server .session import ServerSession
17+ from mcp .shared .context import RequestContext
1918from mcp .shared .message import SessionMessage
2019from mcp .shared .session import RequestResponder
2120from mcp .types import (
22- EventEmitNotification ,
2321 EventListRequest ,
2422 EventListResult ,
2523 EventParams ,
26- EventsCapability ,
2724 EventSubscribeParams ,
2825 EventSubscribeRequest ,
2926 EventSubscribeResult ,
3330 EventUnsubscribeResult ,
3431 RejectedTopic ,
3532 RetainedEvent ,
36- ServerCapabilities ,
3733 SubscribedTopic ,
3834)
3935
40-
4136# Shared registry and store for the test server
4237_registry = SubscriptionRegistry ()
4338_retained_store = RetainedValueStore ()
@@ -141,7 +136,7 @@ async def _run_server(server_session: ServerSession, server: Server) -> None:
141136@pytest .fixture (autouse = True )
142137async def reset_registry ():
143138 """Reset the global registry and store between tests."""
144- global _registry , _retained_store
139+ global _registry , _retained_store # noqa: PLW0603
145140 _registry = SubscriptionRegistry ()
146141 _retained_store = RetainedValueStore ()
147142 yield
@@ -189,11 +184,14 @@ async def event_handler(params: EventParams):
189184 sub_result = await client_session .subscribe_events (["test/+" ])
190185 assert len (sub_result .subscribed ) == 1
191186
192- # Server emits
187+ # Server emits with an explicit timestamp, exercising the
188+ # branch where emit_event does NOT auto-generate one.
189+ explicit_ts = "2025-01-01T00:00:00+00:00"
193190 await server_session .emit_event (
194191 topic = "test/hello" ,
195192 payload = {"message" : "world" },
196193 event_id = "evt-1" ,
194+ timestamp = explicit_ts ,
197195 )
198196
199197 # Give the notification time to propagate
@@ -203,6 +201,7 @@ async def event_handler(params: EventParams):
203201 assert received_events [0 ].topic == "test/hello"
204202 assert received_events [0 ].payload == {"message" : "world" }
205203 assert received_events [0 ].event_id == "evt-1"
204+ assert received_events [0 ].timestamp == explicit_ts
206205
207206 tg .cancel_scope .cancel ()
208207 except (anyio .ClosedResourceError , anyio .EndOfStream ):
@@ -502,3 +501,86 @@ async def test_subscribe_rejects_undeclared_topic():
502501 tg .cancel_scope .cancel ()
503502 except (anyio .ClosedResourceError , anyio .EndOfStream ):
504503 pass
504+
505+
506+ @pytest .mark .anyio
507+ async def test_topic_matches_subscriptions_recompiles_on_cache_miss ():
508+ """_topic_matches_subscriptions should recompile when the cache entry is missing.
509+
510+ This exercises the fallback branch where a pattern is in ``_subscribed_patterns``
511+ but not in ``_subscription_regex_cache`` (e.g. after manual cache eviction).
512+ """
513+ server_to_client_send , server_to_client_receive = anyio .create_memory_object_stream [SessionMessage ](10 )
514+ client_to_server_send , client_to_server_receive = anyio .create_memory_object_stream [SessionMessage ](10 )
515+
516+ try :
517+ async with (
518+ server_to_client_send ,
519+ server_to_client_receive ,
520+ client_to_server_send ,
521+ client_to_server_receive ,
522+ ClientSession (
523+ server_to_client_receive ,
524+ client_to_server_send ,
525+ ) as client_session ,
526+ ):
527+ # Seed a pattern without populating the regex cache.
528+ client_session ._subscribed_patterns .add ("foo/+" )
529+ assert "foo/+" not in client_session ._subscription_regex_cache
530+
531+ assert client_session ._topic_matches_subscriptions ("foo/bar" ) is True
532+ # The cache should now be populated as a side effect.
533+ assert "foo/+" in client_session ._subscription_regex_cache
534+
535+ # Non-matching topic exercises the return False path.
536+ assert client_session ._topic_matches_subscriptions ("other/thing" ) is False
537+ except (anyio .ClosedResourceError , anyio .EndOfStream ):
538+ pass
539+
540+
541+ @pytest .mark .anyio
542+ async def test_subscribe_events_skips_recompile_for_cached_pattern ():
543+ """subscribe_events should not recompile a regex for an already-cached pattern.
544+
545+ Covers the branch where ``sub.pattern`` is already present in
546+ ``_subscription_regex_cache`` so the compile step is skipped.
547+ """
548+ server_to_client_send , server_to_client_receive = anyio .create_memory_object_stream [SessionMessage ](10 )
549+ client_to_server_send , client_to_server_receive = anyio .create_memory_object_stream [SessionMessage ](10 )
550+
551+ server = _create_test_server ()
552+ # Reset shared state for isolation.
553+ _registry ._subscriptions .clear ()
554+
555+ try :
556+ async with (
557+ ServerSession (
558+ client_to_server_receive ,
559+ server_to_client_send ,
560+ InitializationOptions (
561+ server_name = "test" ,
562+ server_version = "0.1.0" ,
563+ capabilities = server .get_capabilities (NotificationOptions (), {}),
564+ ),
565+ ) as server_session ,
566+ ClientSession (
567+ server_to_client_receive ,
568+ client_to_server_send ,
569+ message_handler = _message_handler ,
570+ ) as client_session ,
571+ anyio .create_task_group () as tg ,
572+ ):
573+ tg .start_soon (_run_server , server_session , server )
574+ await client_session .initialize ()
575+
576+ # First subscribe populates the cache.
577+ await client_session .subscribe_events (["test/+" ])
578+ cached_regex = client_session ._subscription_regex_cache ["test/+" ]
579+
580+ # Second subscribe to the same pattern should reuse the cached compile.
581+ await client_session .subscribe_events (["test/+" ])
582+ assert client_session ._subscription_regex_cache ["test/+" ] is cached_regex
583+
584+ tg .cancel_scope .cancel ()
585+ except (anyio .ClosedResourceError , anyio .EndOfStream ):
586+ pass
0 commit comments