Skip to content

Commit 8a34c8f

Browse files
committed
fix qwen3.5_stream
1 parent 00eb78e commit 8a34c8f

3 files changed

Lines changed: 170 additions & 84 deletions

File tree

lightllm/server/api_models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class ToolCall(BaseModel):
8787

8888
id: Optional[str] = None
8989
index: Optional[int] = None
90-
type: Literal["function"] = "function"
90+
type: Optional[Literal["function"]] = None
9191
function: FunctionResponse
9292

9393

lightllm/server/api_openai.py

Lines changed: 93 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,22 @@ def _process_tools_stream(index: int, delta: str, parser_dict: Dict, request: Ch
160160
return normal_text, calls
161161

162162

163+
def _split_tool_argument_delta(arguments: Optional[str]) -> List[str]:
164+
"""Split a complete JSON argument string into OpenAI-style deltas."""
165+
if not arguments:
166+
return []
167+
if len(arguments) <= 2:
168+
return [arguments]
169+
if arguments[0] in "{[" and arguments[-1] in "}]":
170+
middle = arguments[1:-1]
171+
chunks = [arguments[0]]
172+
if middle:
173+
chunks.append(middle)
174+
chunks.append(arguments[-1])
175+
return [chunk for chunk in chunks if chunk]
176+
return [arguments]
177+
178+
163179
async def chat_completions_impl(request: ChatCompletionRequest, raw_request: Request) -> Response:
164180
from .api_http import g_objs
165181

@@ -342,6 +358,7 @@ async def chat_completions_impl(request: ChatCompletionRequest, raw_request: Req
342358
ToolCall(
343359
id=tool_id,
344360
index=getattr(call_info, "tool_index", None),
361+
type="function",
345362
function=FunctionResponse(name=call_info.name, arguments=call_info.parameters),
346363
)
347364
)
@@ -408,7 +425,7 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
408425
choices=[choice_data],
409426
model=request.model,
410427
)
411-
yield f"data: {chunk.model_dump_json()}\n\n"
428+
yield f"data: {chunk.model_dump_json(exclude_none=True)}\n\n"
412429

413430
if request.tool_choice != "none" and request.tools:
414431
# parse_increment => returns (normal_text, calls)
@@ -417,7 +434,7 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
417434
)
418435

419436
# 1) if there's normal_text, output it as normal content
420-
if normal_text:
437+
if normal_text and (normal_text.strip() or not has_emitted_tool_calls[sub_req_id]):
421438
choice_data = ChatCompletionStreamResponseChoice(
422439
index=choice_index,
423440
delta=DeltaMessage(role="assistant", content=normal_text),
@@ -429,7 +446,7 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
429446
choices=[choice_data],
430447
model=request.model,
431448
)
432-
yield f"data: {chunk.model_dump_json()}\n\n"
449+
yield f"data: {chunk.model_dump_json(exclude_none=True)}\n\n"
433450

434451
# 2) if we found calls, we output them as separate chunk(s)
435452
history_tool_calls_cnt = _get_history_tool_calls_cnt(request)
@@ -456,7 +473,8 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
456473
call_item.parameters = remaining_call
457474

458475
tool_parser = getattr(g_objs.args, "tool_call_parser", None) or "llama3"
459-
id_key = (choice_index, call_item.tool_index)
476+
stream_index = getattr(call_item, "tool_index", None)
477+
id_key = (choice_index, stream_index)
460478
if call_item.name:
461479
if id_key not in stream_tool_call_ids:
462480
stream_tool_call_ids[id_key] = _process_tool_call_id(
@@ -468,26 +486,74 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
468486
tool_call_id = stream_tool_call_ids.get(id_key)
469487
function_name = None
470488

471-
tool_call = ToolCall(
472-
id=tool_call_id,
473-
index=getattr(call_item, "tool_index", None),
474-
function=FunctionResponse(
475-
name=function_name,
476-
arguments=call_item.parameters,
477-
),
478-
)
479-
choice_data = ChatCompletionStreamResponseChoice(
480-
index=choice_index,
481-
delta=DeltaMessage(role="assistant", tool_calls=[tool_call]),
482-
finish_reason=None,
483-
)
484-
chunk = ChatCompletionStreamResponse(
485-
id=group_request_id,
486-
created=created_time,
487-
choices=[choice_data],
488-
model=request.model,
489-
)
490-
yield f"data: {chunk.model_dump_json()}\n\n"
489+
is_tool_head = call_item.name is not None
490+
491+
if is_tool_head and call_item.parameters:
492+
head_tool_call = ToolCall(
493+
id=tool_call_id,
494+
index=stream_index,
495+
type="function",
496+
function=FunctionResponse(
497+
name=function_name,
498+
arguments="",
499+
),
500+
)
501+
head_choice = ChatCompletionStreamResponseChoice(
502+
index=choice_index,
503+
delta=DeltaMessage(tool_calls=[head_tool_call]),
504+
finish_reason=None,
505+
)
506+
head_chunk = ChatCompletionStreamResponse(
507+
id=group_request_id,
508+
created=created_time,
509+
choices=[head_choice],
510+
model=request.model,
511+
)
512+
yield f"data: {head_chunk.model_dump_json(exclude_none=True)}\n\n"
513+
514+
for arg_delta in _split_tool_argument_delta(call_item.parameters):
515+
arg_tool_call = ToolCall(
516+
index=stream_index,
517+
function=FunctionResponse(arguments=arg_delta),
518+
)
519+
arg_choice = ChatCompletionStreamResponseChoice(
520+
index=choice_index,
521+
delta=DeltaMessage(tool_calls=[arg_tool_call]),
522+
finish_reason=None,
523+
)
524+
arg_chunk = ChatCompletionStreamResponse(
525+
id=group_request_id,
526+
created=created_time,
527+
choices=[arg_choice],
528+
model=request.model,
529+
)
530+
yield f"data: {arg_chunk.model_dump_json(exclude_none=True)}\n\n"
531+
else:
532+
tool_call = ToolCall(
533+
id=tool_call_id if is_tool_head else None,
534+
index=stream_index,
535+
type="function" if is_tool_head else None,
536+
function=FunctionResponse(
537+
name=function_name,
538+
arguments=(
539+
(call_item.parameters if call_item.parameters is not None else "")
540+
if is_tool_head
541+
else call_item.parameters
542+
),
543+
),
544+
)
545+
choice_data = ChatCompletionStreamResponseChoice(
546+
index=choice_index,
547+
delta=DeltaMessage(tool_calls=[tool_call]),
548+
finish_reason=None,
549+
)
550+
chunk = ChatCompletionStreamResponse(
551+
id=group_request_id,
552+
created=created_time,
553+
choices=[choice_data],
554+
model=request.model,
555+
)
556+
yield f"data: {chunk.model_dump_json(exclude_none=True)}\n\n"
491557
else:
492558
delta_message = DeltaMessage(role="assistant", content=delta)
493559
stream_choice = ChatCompletionStreamResponseChoice(
@@ -499,7 +565,7 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
499565
model=request.model,
500566
choices=[stream_choice],
501567
)
502-
yield f"data: {stream_resp.model_dump_json()}\n\n"
568+
yield f"data: {stream_resp.model_dump_json(exclude_none=True)}\n\n"
503569

504570
# Emit a per-choice final empty chunk with finish_reason.
505571
if current_finish_reason is not None:
@@ -516,7 +582,7 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
516582
model=request.model,
517583
choices=[final_choice],
518584
)
519-
yield f"data: {final_chunk.model_dump_json()}\n\n"
585+
yield f"data: {final_chunk.model_dump_json(exclude_none=True)}\n\n"
520586

521587
if request.stream_options and request.stream_options.include_usage:
522588
usage = UsageInfo(
@@ -531,7 +597,7 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
531597
model=request.model,
532598
usage=usage,
533599
)
534-
yield f"data: {usage_chunk.model_dump_json()}\n\n"
600+
yield f"data: {usage_chunk.model_dump_json(exclude_none=True)}\n\n"
535601

536602
background_tasks = BackgroundTasks()
537603
return StreamingResponse(stream_results(), media_type="text/event-stream", background=background_tasks)

lightllm/server/function_call_parser.py

Lines changed: 76 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1846,6 +1846,48 @@ def _parse_function_call(self, function_str: str, tools: List[Tool]) -> Optional
18461846
parameters=json.dumps(param_dict, ensure_ascii=False),
18471847
)
18481848

1849+
def _build_partial_arguments_json(self, func_name: str, partial_body: str, tools: List[Tool]) -> Optional[str]:
1850+
"""Build the current argument JSON from a partial XML tool-call body."""
1851+
param_matches = self.parameter_regex.findall(partial_body)
1852+
if not param_matches:
1853+
return None
1854+
1855+
param_config = self._get_param_config(func_name, tools)
1856+
param_dict = {}
1857+
has_visible_value = False
1858+
1859+
for match in param_matches:
1860+
try:
1861+
idx = match.index(">")
1862+
except ValueError:
1863+
continue
1864+
1865+
param_name = match[:idx].strip()
1866+
param_value = match[idx + 1 :]
1867+
if param_value.startswith("\n"):
1868+
param_value = param_value[1:]
1869+
if param_value.endswith("\n"):
1870+
param_value = param_value[:-1]
1871+
1872+
if param_value.strip():
1873+
has_visible_value = True
1874+
elif (
1875+
f"<parameter={param_name}>" in partial_body
1876+
and f"<parameter={param_name}>{param_value}</parameter>" in partial_body
1877+
):
1878+
# Closed empty-string parameter. We can safely emit it.
1879+
has_visible_value = True
1880+
else:
1881+
# Parameter tag is present but its value has not started streaming yet.
1882+
continue
1883+
1884+
param_dict[param_name] = self._convert_param_value(param_value, param_name, param_config, func_name)
1885+
1886+
if not param_dict and not has_visible_value:
1887+
return None
1888+
1889+
return json.dumps(param_dict, ensure_ascii=False)
1890+
18491891
def detect_and_parse(self, text: str, tools: List[Tool]) -> StreamingParseResult:
18501892
idx = text.find(self.bot_token)
18511893
normal_text = text[:idx].strip() if idx != -1 else text
@@ -1865,79 +1907,57 @@ def detect_and_parse(self, text: str, tools: List[Tool]) -> StreamingParseResult
18651907
func_str = match[0] if match[0] else match[1]
18661908
item = self._parse_function_call(func_str, tools)
18671909
if item:
1910+
item.tool_index = len(calls)
18681911
calls.append(item)
18691912

18701913
return StreamingParseResult(normal_text=normal_text, calls=calls)
18711914

18721915
def parse_streaming_increment(self, new_text: str, tools: List[Tool]) -> StreamingParseResult:
18731916
"""Streaming incremental parsing for Qwen3-Coder XML tool calls."""
18741917
self._buffer += new_text
1875-
current_text = self._buffer
1876-
1877-
if not self.has_tool_call(current_text):
1878-
partial_len = self._ends_with_partial_token(current_text, self.bot_token)
1879-
if partial_len:
1880-
return StreamingParseResult()
1881-
self._buffer = ""
1882-
cleaned = new_text.replace(self.eot_token, "")
1883-
return StreamingParseResult(normal_text=cleaned)
1884-
1885-
# Check for complete tool call blocks
1886-
if self.eot_token in current_text:
1887-
result = self.detect_and_parse(current_text, tools)
1888-
last_end = current_text.rfind(self.eot_token)
1889-
if last_end != -1:
1890-
self._buffer = current_text[last_end + len(self.eot_token) :].lstrip()
1891-
else:
1892-
self._buffer = ""
1893-
self.current_tool_id = -1
1894-
self.current_tool_name_sent = False
1895-
return result
1896-
1897-
# Partial tool call - try to extract function name for early streaming
18981918
if not hasattr(self, "_tool_indices"):
18991919
self._tool_indices = self._get_tool_indices(tools)
19001920

1901-
calls = []
1902-
tool_call_start = current_text.find(self.bot_token)
1903-
if tool_call_start == -1:
1904-
return StreamingParseResult()
1921+
normal_text = ""
1922+
calls: List[ToolCallItem] = []
19051923

1906-
content_after = current_text[tool_call_start + len(self.bot_token) :]
1907-
func_prefix = "<function="
1908-
func_pos = content_after.find(func_prefix)
1909-
if func_pos == -1:
1910-
return StreamingParseResult()
1924+
while True:
1925+
current_text = self._buffer
1926+
tool_call_start = current_text.find(self.bot_token)
19111927

1912-
after_func = content_after[func_pos + len(func_prefix) :]
1913-
gt_pos = after_func.find(">")
1914-
if gt_pos == -1:
1915-
return StreamingParseResult()
1928+
if tool_call_start == -1:
1929+
partial_len = self._ends_with_partial_token(current_text, self.bot_token)
1930+
if partial_len:
1931+
return StreamingParseResult(normal_text=normal_text, calls=calls)
1932+
if current_text:
1933+
normal_text += current_text.replace(self.eot_token, "")
1934+
self._buffer = ""
1935+
return StreamingParseResult(normal_text=normal_text, calls=calls)
19161936

1917-
func_name = after_func[:gt_pos].strip()
1937+
if tool_call_start > 0:
1938+
normal_text += current_text[:tool_call_start]
1939+
self._buffer = current_text[tool_call_start:]
1940+
current_text = self._buffer
19181941

1919-
if self.current_tool_id == -1:
1920-
self.current_tool_id = 0
1921-
self.prev_tool_call_arr = []
1922-
self.streamed_args_for_tool = [""]
1942+
eot_pos = current_text.find(self.eot_token)
1943+
if eot_pos == -1:
1944+
return StreamingParseResult(normal_text=normal_text, calls=calls)
19231945

1924-
while len(self.prev_tool_call_arr) <= self.current_tool_id:
1925-
self.prev_tool_call_arr.append({})
1926-
while len(self.streamed_args_for_tool) <= self.current_tool_id:
1927-
self.streamed_args_for_tool.append("")
1946+
complete_block = current_text[: eot_pos + len(self.eot_token)]
1947+
func_matches = self.function_regex.findall(complete_block)
19281948

1929-
if func_name and func_name in self._tool_indices and not self.current_tool_name_sent:
1930-
calls.append(
1931-
ToolCallItem(
1932-
tool_index=self.current_tool_id,
1933-
name=func_name,
1934-
parameters="",
1935-
)
1936-
)
1937-
self.current_tool_name_sent = True
1938-
self.prev_tool_call_arr[self.current_tool_id] = {"name": func_name, "arguments": {}}
1949+
if self.current_tool_id == -1:
1950+
self.current_tool_id = 0
1951+
1952+
for match in func_matches:
1953+
func_str = match[0] if match[0] else match[1]
1954+
item = self._parse_function_call(func_str, tools)
1955+
if item:
1956+
item.tool_index = self.current_tool_id
1957+
calls.append(item)
1958+
self.current_tool_id += 1
19391959

1940-
return StreamingParseResult(normal_text="", calls=calls)
1960+
self._buffer = current_text[eot_pos + len(self.eot_token) :].lstrip()
19411961

19421962

19431963
class FunctionCallParser:

0 commit comments

Comments
 (0)