1212import json
1313import logging
1414import mimetypes
15+ import uuid
16+ import warnings
1517from typing import Any , Literal
1618
1719from a2a .server .agent_execution import AgentExecutor , RequestContext
@@ -49,13 +51,21 @@ class StrandsA2AExecutor(AgentExecutor):
4951 # Handle special cases where format differs from extension
5052 FORMAT_MAPPINGS = {"jpg" : "jpeg" , "htm" : "html" , "3gp" : "three_gp" , "3gpp" : "three_gp" , "3g2" : "three_gp" }
5153
52- def __init__ (self , agent : SAAgent ):
54+ # A2A-compliant streaming mode
55+ _current_artifact_id : str | None
56+ _is_first_chunk : bool
57+
58+ def __init__ (self , agent : SAAgent , * , enable_a2a_compliant_streaming : bool = False ):
5359 """Initialize a StrandsA2AExecutor.
5460
5561 Args:
5662 agent: The Strands Agent instance to adapt to the A2A protocol.
63+ enable_a2a_compliant_streaming: If True, uses A2A-compliant streaming with
64+ artifact updates. If False, uses legacy status updates streaming behavior
65+ for backwards compatibility. Defaults to False.
5766 """
5867 self .agent = agent
68+ self .enable_a2a_compliant_streaming = enable_a2a_compliant_streaming
5969
6070 async def execute (
6171 self ,
@@ -104,12 +114,30 @@ async def _execute_streaming(self, context: RequestContext, updater: TaskUpdater
104114 else :
105115 raise ValueError ("No content blocks available" )
106116
117+ if not self .enable_a2a_compliant_streaming :
118+ warnings .warn (
119+ "The default A2A response stream implemented in the strands sdk does not conform to "
120+ "what is expected in the A2A spec. Please set the `enable_a2a_compliant_streaming` "
121+ "boolean to `True` on your `A2AServer` class to properly conform to the spec. "
122+ "In the next major version release, this will be the default behavior." ,
123+ UserWarning ,
124+ stacklevel = 3 ,
125+ )
126+
127+ if self .enable_a2a_compliant_streaming :
128+ self ._current_artifact_id = str (uuid .uuid4 ())
129+ self ._is_first_chunk = True
130+
107131 try :
108132 async for event in self .agent .stream_async (content_blocks ):
109133 await self ._handle_streaming_event (event , updater )
110134 except Exception :
111135 logger .exception ("Error in streaming execution" )
112136 raise
137+ finally :
138+ if self .enable_a2a_compliant_streaming :
139+ self ._current_artifact_id = None
140+ self ._is_first_chunk = True
113141
114142 async def _handle_streaming_event (self , event : dict [str , Any ], updater : TaskUpdater ) -> None :
115143 """Handle a single streaming event from the Strands Agent.
@@ -125,28 +153,60 @@ async def _handle_streaming_event(self, event: dict[str, Any], updater: TaskUpda
125153 logger .debug ("Streaming event: %s" , event )
126154 if "data" in event :
127155 if text_content := event ["data" ]:
128- await updater .update_status (
129- TaskState .working ,
130- new_agent_text_message (
131- text_content ,
132- updater .context_id ,
133- updater .task_id ,
134- ),
135- )
156+ if self .enable_a2a_compliant_streaming :
157+ await updater .add_artifact (
158+ [Part (root = TextPart (text = text_content ))],
159+ artifact_id = self ._current_artifact_id ,
160+ name = "agent_response" ,
161+ append = not self ._is_first_chunk ,
162+ )
163+ self ._is_first_chunk = False
164+ else :
165+ # Legacy use update_status with agent message
166+ await updater .update_status (
167+ TaskState .working ,
168+ new_agent_text_message (
169+ text_content ,
170+ updater .context_id ,
171+ updater .task_id ,
172+ ),
173+ )
136174 elif "result" in event :
137175 await self ._handle_agent_result (event ["result" ], updater )
138176
139177 async def _handle_agent_result (self , result : SAAgentResult | None , updater : TaskUpdater ) -> None :
140178 """Handle the final result from the Strands Agent.
141179
142- Processes the agent's final result, extracts text content from the response,
143- and adds it as an artifact to the task before marking the task as complete.
180+ For A2A-compliant streaming: sends the final artifact chunk marker and marks
181+ the task as complete. If no data chunks were previously sent, includes the
182+ result content.
183+
184+ For legacy streaming: adds the final result as a simple artifact without
185+ artifact_id tracking.
144186
145187 Args:
146188 result: The agent result object containing the final response, or None if no result.
147189 updater: The task updater for managing task state and adding the final artifact.
148190 """
149- if final_content := str (result ):
191+ if self .enable_a2a_compliant_streaming :
192+ if self ._is_first_chunk :
193+ final_content = str (result ) if result else ""
194+ parts = [Part (root = TextPart (text = final_content ))] if final_content else []
195+ await updater .add_artifact (
196+ parts ,
197+ artifact_id = self ._current_artifact_id ,
198+ name = "agent_response" ,
199+ last_chunk = True ,
200+ )
201+ else :
202+ await updater .add_artifact (
203+ [],
204+ artifact_id = self ._current_artifact_id ,
205+ name = "agent_response" ,
206+ append = True ,
207+ last_chunk = True ,
208+ )
209+ elif final_content := str (result ):
150210 await updater .add_artifact (
151211 [Part (root = TextPart (text = final_content ))],
152212 name = "agent_response" ,
0 commit comments