diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index 073f4546c6..6707f8194b 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -51,6 +51,7 @@ from sentry_sdk._types import TextPart from openai.types.responses import ResponseInputParam, SequenceNotStr + from openai.types.responses import ResponseStreamEvent from openai import Omit try: @@ -67,6 +68,8 @@ from openai.resources.chat.completions import Completions, AsyncCompletions from openai.resources import Embeddings, AsyncEmbeddings + from openai import Stream, AsyncStream + if TYPE_CHECKING: from openai.types.chat import ( ChatCompletionMessageParam, @@ -574,9 +577,38 @@ def _new_chat_completion_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any start_time = time.perf_counter() response = yield f, args, kwargs - if is_streaming_response: - _set_streaming_completions_api_output_data( - span, response, kwargs, integration, start_time, finish_span=True + # Attribute check to fail gracefully if the attribute is not present in future `openai` versions. + if isinstance(response, Stream) and hasattr(response, "_iterator"): + messages = kwargs.get("messages") + + if messages is not None and isinstance(messages, str): + messages = [messages] + + response._iterator = _wrap_synchronous_completions_chunk_iterator( + span=span, + integration=integration, + start_time=start_time, + messages=messages, + response=response, + old_iterator=response._iterator, + finish_span=True, + ) + + # Attribute check to fail gracefully if the attribute is not present in future `openai` versions. + elif isinstance(response, AsyncStream) and hasattr(response, "_iterator"): + messages = kwargs.get("messages") + + if messages is not None and isinstance(messages, str): + messages = [messages] + + response._iterator = _wrap_asynchronous_completions_chunk_iterator( + span=span, + integration=integration, + start_time=start_time, + messages=messages, + response=response, + old_iterator=response._iterator, + finish_span=True, ) else: _set_completions_api_output_data( @@ -607,116 +639,241 @@ def _set_completions_api_output_data( ) -def _set_streaming_completions_api_output_data( +def _wrap_synchronous_completions_chunk_iterator( span: "Span", - response: "Any", - kwargs: "dict[str, Any]", integration: "OpenAIIntegration", - start_time: "Optional[float]" = None, - finish_span: bool = True, -) -> None: - messages = kwargs.get("messages") + start_time: "Optional[float]", + messages: "Optional[Iterable[ChatCompletionMessageParam]]", + response: "Stream[ChatCompletionChunk]", + old_iterator: "Iterator[ChatCompletionChunk]", + finish_span: "bool", +) -> "Iterator[ChatCompletionChunk]": + """ + Sets information received while iterating the response stream on the AI Client Span. + Compute token count based on inputs and outputs using tiktoken if token counts are not in the model response. + Responsible for closing the AI Client Span if instructed to by the `finish_span` argument. + """ + ttft = None + data_buf: "list[list[str]]" = [] # one for each choice - if messages is not None and isinstance(messages, str): - messages = [messages] + for x in old_iterator: + span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, x.model) - ttft: "Optional[float]" = None + with capture_internal_exceptions(): + if hasattr(x, "choices"): + choice_index = 0 + for choice in x.choices: + if hasattr(choice, "delta") and hasattr(choice.delta, "content"): + if start_time is not None and ttft is None: + ttft = time.perf_counter() - start_time + content = choice.delta.content + if len(data_buf) <= choice_index: + data_buf.append([]) + data_buf[choice_index].append(content or "") + choice_index += 1 + + yield x + + with capture_internal_exceptions(): + if ttft is not None: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft + ) + if len(data_buf) > 0: + all_responses = ["".join(chunk) for chunk in data_buf] + if should_send_default_pii() and integration.include_prompts: + set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses) + _calculate_token_usage( + messages, + response, + span, + all_responses, + integration.count_tokens, + ) + + if finish_span: + span.__exit__(None, None, None) + + +async def _wrap_asynchronous_completions_chunk_iterator( + span: "Span", + integration: "OpenAIIntegration", + start_time: "Optional[float]", + messages: "Optional[Iterable[ChatCompletionMessageParam]]", + response: "AsyncStream[ChatCompletionChunk]", + old_iterator: "AsyncIterator[ChatCompletionChunk]", + finish_span: "bool", +) -> "AsyncIterator[ChatCompletionChunk]": + """ + Sets information received while iterating the response stream on the AI Client Span. + Compute token count based on inputs and outputs using tiktoken if token counts are not in the model response. + Responsible for closing the AI Client Span if instructed to by the `finish_span` argument. + """ + ttft = None data_buf: "list[list[str]]" = [] # one for each choice - old_iterator = response._iterator - - def new_iterator() -> "Iterator[ChatCompletionChunk]": - nonlocal ttft - for x in old_iterator: - span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, x.model) - - with capture_internal_exceptions(): - if hasattr(x, "choices"): - choice_index = 0 - for choice in x.choices: - if hasattr(choice, "delta") and hasattr( - choice.delta, "content" - ): - if start_time is not None and ttft is None: - ttft = time.perf_counter() - start_time - content = choice.delta.content - if len(data_buf) <= choice_index: - data_buf.append([]) - data_buf[choice_index].append(content or "") - choice_index += 1 - - yield x + async for x in old_iterator: + span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, x.model) with capture_internal_exceptions(): - if ttft is not None: - set_data_normalized( - span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft + if hasattr(x, "choices"): + choice_index = 0 + for choice in x.choices: + if hasattr(choice, "delta") and hasattr(choice.delta, "content"): + if start_time is not None and ttft is None: + ttft = time.perf_counter() - start_time + content = choice.delta.content + if len(data_buf) <= choice_index: + data_buf.append([]) + data_buf[choice_index].append(content or "") + choice_index += 1 + + yield x + + with capture_internal_exceptions(): + if ttft is not None: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft + ) + if len(data_buf) > 0: + all_responses = ["".join(chunk) for chunk in data_buf] + if should_send_default_pii() and integration.include_prompts: + set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses) + _calculate_token_usage( + messages, + response, + span, + all_responses, + integration.count_tokens, + ) + + if finish_span: + span.__exit__(None, None, None) + + +def _wrap_synchronous_responses_event_iterator( + span: "Span", + integration: "OpenAIIntegration", + start_time: "Optional[float]", + input: "Optional[Union[str, ResponseInputParam]]", + response: "Stream[ResponseStreamEvent]", + old_iterator: "Iterator[ResponseStreamEvent]", + finish_span: "bool", +) -> "Iterator[ResponseStreamEvent]": + """ + Sets information received while iterating the response stream on the AI Client Span. + Compute token count based on inputs and outputs using tiktoken if token counts are not in the model response. + Responsible for closing the AI Client Span if instructed to by the `finish_span` argument. + """ + ttft = None + data_buf: "list[list[str]]" = [] # one for each choice + + count_tokens_manually = True + for x in old_iterator: + with capture_internal_exceptions(): + if hasattr(x, "delta"): + if start_time is not None and ttft is None: + ttft = time.perf_counter() - start_time + if len(data_buf) == 0: + data_buf.append([]) + data_buf[0].append(x.delta or "") + + if isinstance(x, ResponseCompletedEvent): + span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, x.response.model) + + _calculate_token_usage( + input, + x.response, + span, + None, + integration.count_tokens, ) - if len(data_buf) > 0: - all_responses = ["".join(chunk) for chunk in data_buf] - if should_send_default_pii() and integration.include_prompts: - set_data_normalized( - span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses - ) + count_tokens_manually = False + + yield x + + with capture_internal_exceptions(): + if ttft is not None: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft + ) + if len(data_buf) > 0: + all_responses = ["".join(chunk) for chunk in data_buf] + if should_send_default_pii() and integration.include_prompts: + set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses) + if count_tokens_manually: _calculate_token_usage( - messages, + input, response, span, all_responses, integration.count_tokens, ) - if finish_span: - span.__exit__(None, None, None) + if finish_span: + span.__exit__(None, None, None) + - async def new_iterator_async() -> "AsyncIterator[ChatCompletionChunk]": - nonlocal ttft - async for x in old_iterator: - span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, x.model) - - with capture_internal_exceptions(): - if hasattr(x, "choices"): - choice_index = 0 - for choice in x.choices: - if hasattr(choice, "delta") and hasattr( - choice.delta, "content" - ): - if start_time is not None and ttft is None: - ttft = time.perf_counter() - start_time - content = choice.delta.content - if len(data_buf) <= choice_index: - data_buf.append([]) - data_buf[choice_index].append(content or "") - choice_index += 1 - - yield x +async def _wrap_asynchronous_responses_event_iterator( + span: "Span", + integration: "OpenAIIntegration", + start_time: "Optional[float]", + input: "Optional[Union[str, ResponseInputParam]]", + response: "AsyncStream[ResponseStreamEvent]", + old_iterator: "AsyncIterator[ResponseStreamEvent]", + finish_span: "bool", +) -> "AsyncIterator[ResponseStreamEvent]": + """ + Sets information received while iterating the response stream on the AI Client Span. + Compute token count based on inputs and outputs using tiktoken if token counts are not in the model response. + Responsible for closing the AI Client Span if instructed to by the `finish_span` argument. + """ + ttft: "Optional[float]" = None + data_buf: "list[list[str]]" = [] # one for each choice + count_tokens_manually = True + async for x in old_iterator: with capture_internal_exceptions(): - if ttft is not None: - set_data_normalized( - span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft + if hasattr(x, "delta"): + if start_time is not None and ttft is None: + ttft = time.perf_counter() - start_time + if len(data_buf) == 0: + data_buf.append([]) + data_buf[0].append(x.delta or "") + + if isinstance(x, ResponseCompletedEvent): + span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, x.response.model) + + _calculate_token_usage( + input, + x.response, + span, + None, + integration.count_tokens, ) - if len(data_buf) > 0: - all_responses = ["".join(chunk) for chunk in data_buf] - if should_send_default_pii() and integration.include_prompts: - set_data_normalized( - span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses - ) + count_tokens_manually = False + + yield x + + with capture_internal_exceptions(): + if ttft is not None: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft + ) + if len(data_buf) > 0: + all_responses = ["".join(chunk) for chunk in data_buf] + if should_send_default_pii() and integration.include_prompts: + set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses) + if count_tokens_manually: _calculate_token_usage( - messages, + input, response, span, all_responses, integration.count_tokens, ) - - if finish_span: - span.__exit__(None, None, None) - - if str(type(response._iterator)) == "": - response._iterator = new_iterator_async() - else: - response._iterator = new_iterator() + if finish_span: + span.__exit__(None, None, None) def _set_responses_api_output_data( @@ -740,127 +897,6 @@ def _set_responses_api_output_data( ) -def _set_streaming_responses_api_output_data( - span: "Span", - response: "Any", - kwargs: "dict[str, Any]", - integration: "OpenAIIntegration", - start_time: "Optional[float]" = None, - finish_span: bool = True, -) -> None: - input = kwargs.get("input") - - if input is not None and isinstance(input, str): - input = [input] - - ttft: "Optional[float]" = None - data_buf: "list[list[str]]" = [] # one for each choice - - old_iterator = response._iterator - - def new_iterator() -> "Iterator[ChatCompletionChunk]": - nonlocal ttft - count_tokens_manually = True - for x in old_iterator: - with capture_internal_exceptions(): - if hasattr(x, "delta"): - if start_time is not None and ttft is None: - ttft = time.perf_counter() - start_time - if len(data_buf) == 0: - data_buf.append([]) - data_buf[0].append(x.delta or "") - - if isinstance(x, ResponseCompletedEvent): - span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, x.response.model) - - _calculate_token_usage( - input, - x.response, - span, - None, - integration.count_tokens, - ) - count_tokens_manually = False - - yield x - - with capture_internal_exceptions(): - if ttft is not None: - set_data_normalized( - span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft - ) - if len(data_buf) > 0: - all_responses = ["".join(chunk) for chunk in data_buf] - if should_send_default_pii() and integration.include_prompts: - set_data_normalized( - span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses - ) - if count_tokens_manually: - _calculate_token_usage( - input, - response, - span, - all_responses, - integration.count_tokens, - ) - - if finish_span: - span.__exit__(None, None, None) - - async def new_iterator_async() -> "AsyncIterator[ChatCompletionChunk]": - nonlocal ttft - count_tokens_manually = True - async for x in old_iterator: - with capture_internal_exceptions(): - if hasattr(x, "delta"): - if start_time is not None and ttft is None: - ttft = time.perf_counter() - start_time - if len(data_buf) == 0: - data_buf.append([]) - data_buf[0].append(x.delta or "") - - if isinstance(x, ResponseCompletedEvent): - span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, x.response.model) - - _calculate_token_usage( - input, - x.response, - span, - None, - integration.count_tokens, - ) - count_tokens_manually = False - - yield x - - with capture_internal_exceptions(): - if ttft is not None: - set_data_normalized( - span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft - ) - if len(data_buf) > 0: - all_responses = ["".join(chunk) for chunk in data_buf] - if should_send_default_pii() and integration.include_prompts: - set_data_normalized( - span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses - ) - if count_tokens_manually: - _calculate_token_usage( - input, - response, - span, - all_responses, - integration.count_tokens, - ) - if finish_span: - span.__exit__(None, None, None) - - if str(type(response._iterator)) == "": - response._iterator = new_iterator_async() - else: - response._iterator = new_iterator() - - def _set_embeddings_output_data( span: "Span", response: "Any", @@ -1065,9 +1101,38 @@ def _new_responses_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "An start_time = time.perf_counter() response = yield f, args, kwargs - if is_streaming_response: - _set_streaming_responses_api_output_data( - span, response, kwargs, integration, start_time, finish_span=True + # Attribute check to fail gracefully if the attribute is not present in future `openai` versions. + if isinstance(response, Stream) and hasattr(response, "_iterator"): + input = kwargs.get("input") + + if input is not None and isinstance(input, str): + input = [input] + + response._iterator = _wrap_synchronous_responses_event_iterator( + span=span, + integration=integration, + start_time=start_time, + input=input, + response=response, + old_iterator=response._iterator, + finish_span=True, + ) + + # Attribute check to fail gracefully if the attribute is not present in future `openai` versions. + elif isinstance(response, AsyncStream) and hasattr(response, "_iterator"): + input = kwargs.get("input") + + if input is not None and isinstance(input, str): + input = [input] + + response._iterator = _wrap_asynchronous_responses_event_iterator( + span=span, + integration=integration, + start_time=start_time, + input=input, + response=response, + old_iterator=response._iterator, + finish_span=True, ) else: _set_responses_api_output_data(