From 1867646ef906545a668ac3e029e5bed428951806 Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Thu, 14 Aug 2025 10:33:06 +0800 Subject: [PATCH 1/5] [BugFix] fix control signal release failed --- fastdeploy/entrypoints/openai/api_server.py | 10 +- fastdeploy/entrypoints/openai/serving_chat.py | 103 ++++++++---------- .../entrypoints/openai/serving_completion.py | 16 +-- fastdeploy/inter_communicator/zmq_client.py | 14 ++- 4 files changed, 71 insertions(+), 72 deletions(-) diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 2a4c0e7aba..bf4f909532 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -171,10 +171,10 @@ async def connection_manager(): await asyncio.wait_for(connection_semaphore.acquire(), timeout=0.001) yield except asyncio.TimeoutError: - api_server_logger.info(f"Reach max request release: {connection_semaphore.status()}") - if connection_semaphore.locked(): - connection_semaphore.release() - raise HTTPException(status_code=429, detail="Too many requests") + api_server_logger.info(f"Reach max request concurrency, semaphore status: {connection_semaphore.status()}") + raise HTTPException( + status_code=429, detail=f"Too many requests,current max concurrency is {args.max_concurrency}" + ) # TODO 传递真实引擎值 通过pid 获取状态 @@ -261,9 +261,11 @@ async def create_chat_completion(request: ChatCompletionRequest): inject_to_metadata(request) generator = await app.state.chat_handler.create_chat_completion(request) if isinstance(generator, ErrorResponse): + api_server_logger.debug(f"release: {connection_semaphore.status()}") connection_semaphore.release() return JSONResponse(content={"detail": generator.model_dump()}, status_code=generator.code) elif isinstance(generator, ChatCompletionResponse): + api_server_logger.debug(f"release: {connection_semaphore.status()}") connection_semaphore.release() return JSONResponse(content=generator.model_dump()) else: diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index b14f28e627..102b494f57 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -78,44 +78,45 @@ async def create_chat_completion(self, request: ChatCompletionRequest): api_server_logger.error(err_msg) return ErrorResponse(message=err_msg, code=400) - if request.user is not None: - request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}" - else: - request_id = f"chatcmpl-{uuid.uuid4()}" - api_server_logger.info(f"create chat completion request: {request_id}") - text_after_process = None - try: - current_req_dict = request.to_dict_for_infer(request_id) - current_req_dict["arrival_time"] = time.time() - prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict) - text_after_process = current_req_dict.get("text_after_process") - if isinstance(prompt_token_ids, np.ndarray): - prompt_token_ids = prompt_token_ids.tolist() - except Exception as e: - return ErrorResponse(code=400, message=str(e)) - - del current_req_dict try: - api_server_logger.debug(f"{self.engine_client.semaphore.status()}") if self.max_waiting_time < 0: await self.engine_client.semaphore.acquire() else: await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) - except Exception: - return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") + api_server_logger.info(f"current {self.engine_client.semaphore.status()}") - if request.stream: - return self.chat_completion_stream_generator( - request, request_id, request.model, prompt_token_ids, text_after_process - ) - else: + if request.user is not None: + request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}" + else: + request_id = f"chatcmpl-{uuid.uuid4()}" + api_server_logger.info(f"create chat completion request: {request_id}") + text_after_process = None try: - return await self.chat_completion_full_generator( - request, request_id, request.model, prompt_token_ids, text_after_process - ) + current_req_dict = request.to_dict_for_infer(request_id) + current_req_dict["arrival_time"] = time.time() + prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict) + text_after_process = current_req_dict.get("text_after_process") + if isinstance(prompt_token_ids, np.ndarray): + prompt_token_ids = prompt_token_ids.tolist() except Exception as e: return ErrorResponse(code=400, message=str(e)) + del current_req_dict + + if request.stream: + return self.chat_completion_stream_generator( + request, request_id, request.model, prompt_token_ids, text_after_process + ) + else: + try: + return await self.chat_completion_full_generator( + request, request_id, request.model, prompt_token_ids, text_after_process + ) + except Exception as e: + return ErrorResponse(code=400, message=str(e)) + except Exception: + return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") + def _create_streaming_error_response(self, message: str) -> str: error_response = ErrorResponse( code=400, @@ -140,7 +141,6 @@ async def chat_completion_stream_generator( previous_num_tokens = 0 num_prompt_tokens = 0 num_choices = 1 - tool_called = False max_streaming_response_tokens = ( request.max_streaming_response_tokens if request.max_streaming_response_tokens is not None @@ -239,34 +239,25 @@ async def chat_completion_stream_generator( prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=num_cached_tokens), ) yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n" - api_server_logger.info(f"Chat Streaming response send_idx 0: {chunk.model_dump_json()}") first_iteration = False output = res["outputs"] delta_text = output["text"] output_top_logprobs = output["top_logprobs"] - previous_num_tokens += len(output["token_ids"]) logprobs_res: Optional[LogProbs] = None if request.logprobs and output_top_logprobs is not None: logprobs_res = self._create_chat_logprobs( output_top_logprobs, request.logprobs, request.top_logprobs ) - if self.engine_client.data_processor.tool_parser_obj and not res["finished"]: - tool_delta_message = output["tool_delta_message"] - if tool_delta_message is None: - continue - delta_message = tool_delta_message - delta_message.reasoning_content = output.get("reasoning_content") - if delta_message.tool_calls: - tool_called = True - else: - delta_message = DeltaMessage( - content=delta_text, - reasoning_content=output.get("reasoning_content"), - prompt_token_ids=None, - completion_token_ids=None, - tool_calls=None, - ) + + previous_num_tokens += len(output["token_ids"]) + delta_message = DeltaMessage( + content=delta_text, + reasoning_content=output.get("reasoning_content"), + prompt_token_ids=None, + completion_token_ids=None, + tool_calls=output.get("tool_call_content", []), + ) choice = ChatCompletionResponseStreamChoice( index=0, @@ -274,7 +265,6 @@ async def chat_completion_stream_generator( logprobs=logprobs_res, arrival_time=arrival_time, ) - if res["finished"]: num_choices -= 1 work_process_metrics.e2e_request_latency.observe( @@ -284,7 +274,10 @@ async def chat_completion_stream_generator( max_tokens = request.max_completion_tokens or request.max_tokens if has_no_token_limit or previous_num_tokens != max_tokens: choice.finish_reason = "stop" - if tool_called: + if ( + self.engine_client.reasoning_parser == "ernie_x1" + and output.get("finish_reason", "") == "tool_calls" + ): choice.finish_reason = "tool_calls" else: choice.finish_reason = "length" @@ -306,9 +299,6 @@ async def chat_completion_stream_generator( if len(choices) == max_streaming_response_tokens or res["finished"]: chunk.choices = choices yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" - # 打印尾包 - if res["finished"]: - api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}") choices = [] if choices: @@ -414,8 +404,9 @@ async def chat_completion_full_generator( if task_is_finished: break finally: - self.engine_client.semaphore.release() dealer.close() + self.engine_client.semaphore.release() + api_server_logger.info(f"release {self.engine_client.semaphore.status()}") choices = [] output = final_res["outputs"] @@ -423,7 +414,7 @@ async def chat_completion_full_generator( role="assistant", content=output["text"], reasoning_content=output.get("reasoning_content"), - tool_calls=output.get("tool_call"), + tool_calls=output.get("tool_call_content"), prompt_token_ids=prompt_token_ids if request.return_token_ids else None, completion_token_ids=completion_token_ids if request.return_token_ids else None, text_after_process=text_after_process if request.return_token_ids else None, @@ -461,15 +452,13 @@ async def chat_completion_full_generator( prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=final_res.get("num_cached_tokens", 0)), ) work_process_metrics.e2e_request_latency.observe(time.time() - final_res["metrics"]["request_start_time"]) - res = ChatCompletionResponse( + return ChatCompletionResponse( id=request_id, created=created_time, model=model_name, choices=choices, usage=usage, ) - api_server_logger.info(f"Chat response: {res.model_dump_json()}") - return res def _create_chat_logprobs( self, diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index a6aadcf060..64cc969039 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -101,6 +101,14 @@ async def create_completion(self, request: CompletionRequest): api_server_logger.info(f"start inference for request {num_choices}") prompt_batched_token_ids = [] text_after_process_list = [] + try: + if self.max_waiting_time < 0: + await self.engine_client.semaphore.acquire() + else: + await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) + except Exception: + return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") + try: for idx, prompt in enumerate(request_prompts): request_id_idx = f"{request_id}-{idx}" @@ -117,14 +125,6 @@ async def create_completion(self, request: CompletionRequest): del current_req_dict - try: - if self.max_waiting_time < 0: - await self.engine_client.semaphore.acquire() - else: - await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) - except Exception: - return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") - if request.stream: return self.completion_stream_generator( request=request, diff --git a/fastdeploy/inter_communicator/zmq_client.py b/fastdeploy/inter_communicator/zmq_client.py index 05e55929dd..5bbfa33ba0 100644 --- a/fastdeploy/inter_communicator/zmq_client.py +++ b/fastdeploy/inter_communicator/zmq_client.py @@ -31,10 +31,10 @@ class ZmqClient: """ def __init__(self, name, mode): - self.context = zmq.Context() + self.context = zmq.Context(4) self.socket = self.context.socket(mode) self.file_name = f"/dev/shm/{name}.socket" - self.router_path = f"/dev/shm/router_{name}.ipc" + self.router_path = f"./router_{name}.ipc" self.ZMQ_SNDHWM = int(envs.FD_ZMQ_SNDHWM) self.aggregate_send = envs.FD_USE_AGGREGATE_SEND @@ -67,6 +67,7 @@ def create_router(self): """ self.router = self.context.socket(zmq.ROUTER) self.router.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM) + self.router.setsockopt(zmq.ROUTER_MANDATORY, 1) self.router.setsockopt(zmq.SNDTIMEO, -1) self.router.bind(f"ipc://{self.router_path}") @@ -125,6 +126,11 @@ def send_multipart(self, req_id, data): else: break + if self.req_dict[req_id] == -1: + if data[-1].finished: + with self.mutex: + self.req_dict.pop(req_id, None) + return try: start_send = time.time() if self.aggregate_send: @@ -133,7 +139,9 @@ def send_multipart(self, req_id, data): result = msgpack.packb([response.to_dict() for response in data]) self.router.send_multipart([self.req_dict[req_id], b"", result]) llm_logger.debug(f"send_multipart result: {req_id} len {len(data)} elapse: {time.time()-start_send}") - + except zmq.ZMQError as e: + llm_logger.error(f"[{req_id}] zmq error: {e}") + self.req_dict[req_id] = -1 except Exception as e: llm_logger.error(f"Send result to zmq client failed: {e}") From f47990247cfe7e727e6f2c4e81f6deb08f7f6471 Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Thu, 14 Aug 2025 10:38:41 +0800 Subject: [PATCH 2/5] [BugFix] fix control signal release failed --- fastdeploy/entrypoints/openai/serving_chat.py | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 102b494f57..765fc2aa53 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -141,6 +141,7 @@ async def chat_completion_stream_generator( previous_num_tokens = 0 num_prompt_tokens = 0 num_choices = 1 + tool_called = False max_streaming_response_tokens = ( request.max_streaming_response_tokens if request.max_streaming_response_tokens is not None @@ -239,25 +240,35 @@ async def chat_completion_stream_generator( prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=num_cached_tokens), ) yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n" + api_server_logger.info(f"Chat Streaming response send_idx 0: {chunk.model_dump_json()}") first_iteration = False output = res["outputs"] delta_text = output["text"] output_top_logprobs = output["top_logprobs"] + previous_num_tokens += len(output["token_ids"]) logprobs_res: Optional[LogProbs] = None if request.logprobs and output_top_logprobs is not None: logprobs_res = self._create_chat_logprobs( output_top_logprobs, request.logprobs, request.top_logprobs ) - previous_num_tokens += len(output["token_ids"]) - delta_message = DeltaMessage( - content=delta_text, - reasoning_content=output.get("reasoning_content"), - prompt_token_ids=None, - completion_token_ids=None, - tool_calls=output.get("tool_call_content", []), - ) + if self.engine_client.data_processor.tool_parser_obj and not res["finished"]: + tool_delta_message = output["tool_delta_message"] + if tool_delta_message is None: + continue + delta_message = tool_delta_message + delta_message.reasoning_content = output.get("reasoning_content") + if delta_message.tool_calls: + tool_called = True + else: + delta_message = DeltaMessage( + content=delta_text, + reasoning_content=output.get("reasoning_content"), + prompt_token_ids=None, + completion_token_ids=None, + tool_calls=None, + ) choice = ChatCompletionResponseStreamChoice( index=0, @@ -274,10 +285,7 @@ async def chat_completion_stream_generator( max_tokens = request.max_completion_tokens or request.max_tokens if has_no_token_limit or previous_num_tokens != max_tokens: choice.finish_reason = "stop" - if ( - self.engine_client.reasoning_parser == "ernie_x1" - and output.get("finish_reason", "") == "tool_calls" - ): + if tool_called: choice.finish_reason = "tool_calls" else: choice.finish_reason = "length" @@ -304,6 +312,8 @@ async def chat_completion_stream_generator( if choices: chunk.choices = choices yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" + if res["finished"]: + api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}") choices = [] if include_usage: From 64ba4bb87080895a87eee418aaf896302f367655 Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Thu, 14 Aug 2025 10:43:53 +0800 Subject: [PATCH 3/5] update --- fastdeploy/entrypoints/openai/serving_chat.py | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 765fc2aa53..d950382acf 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -253,22 +253,22 @@ async def chat_completion_stream_generator( output_top_logprobs, request.logprobs, request.top_logprobs ) - if self.engine_client.data_processor.tool_parser_obj and not res["finished"]: - tool_delta_message = output["tool_delta_message"] - if tool_delta_message is None: - continue - delta_message = tool_delta_message - delta_message.reasoning_content = output.get("reasoning_content") - if delta_message.tool_calls: - tool_called = True - else: - delta_message = DeltaMessage( - content=delta_text, - reasoning_content=output.get("reasoning_content"), - prompt_token_ids=None, - completion_token_ids=None, - tool_calls=None, - ) + if self.engine_client.data_processor.tool_parser_obj and not res["finished"]: + tool_delta_message = output["tool_delta_message"] + if tool_delta_message is None: + continue + delta_message = tool_delta_message + delta_message.reasoning_content = output.get("reasoning_content") + if delta_message.tool_calls: + tool_called = True + else: + delta_message = DeltaMessage( + content=delta_text, + reasoning_content=output.get("reasoning_content"), + prompt_token_ids=None, + completion_token_ids=None, + tool_calls=None, + ) choice = ChatCompletionResponseStreamChoice( index=0, From 1b775ad6e83ddc9f1a318f20e93623cf2af7499f Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Thu, 14 Aug 2025 10:47:04 +0800 Subject: [PATCH 4/5] update --- fastdeploy/entrypoints/openai/serving_chat.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index d950382acf..13e26eb9c8 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -307,13 +307,13 @@ async def chat_completion_stream_generator( if len(choices) == max_streaming_response_tokens or res["finished"]: chunk.choices = choices yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" + if res["finished"]: + api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}") choices = [] if choices: chunk.choices = choices yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" - if res["finished"]: - api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}") choices = [] if include_usage: @@ -424,7 +424,7 @@ async def chat_completion_full_generator( role="assistant", content=output["text"], reasoning_content=output.get("reasoning_content"), - tool_calls=output.get("tool_call_content"), + tool_calls=output.get("tool_call"), prompt_token_ids=prompt_token_ids if request.return_token_ids else None, completion_token_ids=completion_token_ids if request.return_token_ids else None, text_after_process=text_after_process if request.return_token_ids else None, @@ -462,13 +462,15 @@ async def chat_completion_full_generator( prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=final_res.get("num_cached_tokens", 0)), ) work_process_metrics.e2e_request_latency.observe(time.time() - final_res["metrics"]["request_start_time"]) - return ChatCompletionResponse( + res = ChatCompletionResponse( id=request_id, created=created_time, model=model_name, choices=choices, usage=usage, ) + api_server_logger.info(f"Chat response: {res.model_dump_json()}") + return res def _create_chat_logprobs( self, From 6ba261846c5d99bb5cbfdc667344c4db2726c982 Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Thu, 14 Aug 2025 10:49:46 +0800 Subject: [PATCH 5/5] update --- fastdeploy/inter_communicator/zmq_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/inter_communicator/zmq_client.py b/fastdeploy/inter_communicator/zmq_client.py index 5bbfa33ba0..5143d9d47a 100644 --- a/fastdeploy/inter_communicator/zmq_client.py +++ b/fastdeploy/inter_communicator/zmq_client.py @@ -34,7 +34,7 @@ def __init__(self, name, mode): self.context = zmq.Context(4) self.socket = self.context.socket(mode) self.file_name = f"/dev/shm/{name}.socket" - self.router_path = f"./router_{name}.ipc" + self.router_path = f"/dev/shm/router_{name}.ipc" self.ZMQ_SNDHWM = int(envs.FD_ZMQ_SNDHWM) self.aggregate_send = envs.FD_USE_AGGREGATE_SEND