Skip to content

[BugFix] fix control signal release failed #3390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions fastdeploy/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ async def connection_manager():
await asyncio.wait_for(connection_semaphore.acquire(), timeout=0.001)
yield
except asyncio.TimeoutError:
api_server_logger.info(f"Reach max request release: {connection_semaphore.status()}")
if connection_semaphore.locked():
connection_semaphore.release()
raise HTTPException(status_code=429, detail="Too many requests")
api_server_logger.info(f"Reach max request concurrency, semaphore status: {connection_semaphore.status()}")
raise HTTPException(
status_code=429, detail=f"Too many requests,current max concurrency is {args.max_concurrency}"
)


# TODO 传递真实引擎值 通过pid 获取状态
Expand Down Expand Up @@ -265,9 +265,11 @@ async def create_chat_completion(request: ChatCompletionRequest):
inject_to_metadata(request)
generator = await app.state.chat_handler.create_chat_completion(request)
if isinstance(generator, ErrorResponse):
api_server_logger.debug(f"release: {connection_semaphore.status()}")
connection_semaphore.release()
return JSONResponse(content={"detail": generator.model_dump()}, status_code=generator.code)
elif isinstance(generator, ChatCompletionResponse):
api_server_logger.debug(f"release: {connection_semaphore.status()}")
connection_semaphore.release()
return JSONResponse(content=generator.model_dump())
else:
Expand Down
68 changes: 34 additions & 34 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,47 +78,47 @@ async def create_chat_completion(self, request: ChatCompletionRequest):
err_msg = f"Only master node can accept completion request, please send request to master node: {self.pod_ips[0]}"
api_server_logger.error(err_msg)
return ErrorResponse(message=err_msg, code=400)

if request.user is not None:
request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}"
else:
request_id = f"chatcmpl-{uuid.uuid4()}"
api_server_logger.info(f"create chat completion request: {request_id}")
text_after_process = None
try:
current_req_dict = request.to_dict_for_infer(request_id)
if "chat_template" not in current_req_dict:
current_req_dict["chat_template"] = self.chat_template
current_req_dict["arrival_time"] = time.time()
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
text_after_process = current_req_dict.get("text_after_process")
if isinstance(prompt_token_ids, np.ndarray):
prompt_token_ids = prompt_token_ids.tolist()
except Exception as e:
return ErrorResponse(code=400, message=str(e))

del current_req_dict
try:
api_server_logger.debug(f"{self.engine_client.semaphore.status()}")
if self.max_waiting_time < 0:
await self.engine_client.semaphore.acquire()
else:
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
except Exception:
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
api_server_logger.info(f"current {self.engine_client.semaphore.status()}")

if request.stream:
return self.chat_completion_stream_generator(
request, request_id, request.model, prompt_token_ids, text_after_process
)
else:
if request.user is not None:
request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}"
else:
request_id = f"chatcmpl-{uuid.uuid4()}"
api_server_logger.info(f"create chat completion request: {request_id}")
text_after_process = None
try:
return await self.chat_completion_full_generator(
request, request_id, request.model, prompt_token_ids, text_after_process
)
current_req_dict = request.to_dict_for_infer(request_id)
if "chat_template" not in current_req_dict:
current_req_dict["chat_template"] = self.chat_template
current_req_dict["arrival_time"] = time.time()
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
text_after_process = current_req_dict.get("text_after_process")
if isinstance(prompt_token_ids, np.ndarray):
prompt_token_ids = prompt_token_ids.tolist()
except Exception as e:
return ErrorResponse(code=400, message=str(e))

del current_req_dict

if request.stream:
return self.chat_completion_stream_generator(
request, request_id, request.model, prompt_token_ids, text_after_process
)
else:
try:
return await self.chat_completion_full_generator(
request, request_id, request.model, prompt_token_ids, text_after_process
)
except Exception as e:
return ErrorResponse(code=400, message=str(e))
except Exception:
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")

def _create_streaming_error_response(self, message: str) -> str:
error_response = ErrorResponse(
code=400,
Expand Down Expand Up @@ -254,6 +254,7 @@ async def chat_completion_stream_generator(
logprobs_res = self._create_chat_logprobs(
output_top_logprobs, request.logprobs, request.top_logprobs
)

if self.engine_client.data_processor.tool_parser_obj and not res["finished"]:
tool_delta_message = output["tool_delta_message"]
if tool_delta_message is None:
Expand All @@ -277,7 +278,6 @@ async def chat_completion_stream_generator(
logprobs=logprobs_res,
arrival_time=arrival_time,
)

if res["finished"]:
num_choices -= 1
work_process_metrics.e2e_request_latency.observe(
Expand Down Expand Up @@ -309,7 +309,6 @@ async def chat_completion_stream_generator(
if len(choices) == max_streaming_response_tokens or res["finished"]:
chunk.choices = choices
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
# 打印尾包
if res["finished"]:
api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}")
choices = []
Expand Down Expand Up @@ -417,8 +416,9 @@ async def chat_completion_full_generator(
if task_is_finished:
break
finally:
self.engine_client.semaphore.release()
dealer.close()
self.engine_client.semaphore.release()
api_server_logger.info(f"release {self.engine_client.semaphore.status()}")

choices = []
output = final_res["outputs"]
Expand Down
16 changes: 8 additions & 8 deletions fastdeploy/entrypoints/openai/serving_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ async def create_completion(self, request: CompletionRequest):
api_server_logger.info(f"start inference for request {num_choices}")
prompt_batched_token_ids = []
text_after_process_list = []
try:
if self.max_waiting_time < 0:
await self.engine_client.semaphore.acquire()
else:
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
except Exception:
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")

try:
for idx, prompt in enumerate(request_prompts):
request_id_idx = f"{request_id}-{idx}"
Expand All @@ -117,14 +125,6 @@ async def create_completion(self, request: CompletionRequest):

del current_req_dict

try:
if self.max_waiting_time < 0:
await self.engine_client.semaphore.acquire()
else:
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
except Exception:
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")

if request.stream:
return self.completion_stream_generator(
request=request,
Expand Down
12 changes: 10 additions & 2 deletions fastdeploy/inter_communicator/zmq_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ZmqClient:
"""

def __init__(self, name, mode):
self.context = zmq.Context()
self.context = zmq.Context(4)
self.socket = self.context.socket(mode)
self.file_name = f"/dev/shm/{name}.socket"
self.router_path = f"/dev/shm/router_{name}.ipc"
Expand Down Expand Up @@ -67,6 +67,7 @@ def create_router(self):
"""
self.router = self.context.socket(zmq.ROUTER)
self.router.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM)
self.router.setsockopt(zmq.ROUTER_MANDATORY, 1)
self.router.setsockopt(zmq.SNDTIMEO, -1)
self.router.bind(f"ipc://{self.router_path}")

Expand Down Expand Up @@ -125,6 +126,11 @@ def send_multipart(self, req_id, data):
else:
break

if self.req_dict[req_id] == -1:
if data[-1].finished:
with self.mutex:
self.req_dict.pop(req_id, None)
return
try:
start_send = time.time()
if self.aggregate_send:
Expand All @@ -133,7 +139,9 @@ def send_multipart(self, req_id, data):
result = msgpack.packb([response.to_dict() for response in data])
self.router.send_multipart([self.req_dict[req_id], b"", result])
llm_logger.debug(f"send_multipart result: {req_id} len {len(data)} elapse: {time.time()-start_send}")

except zmq.ZMQError as e:
llm_logger.error(f"[{req_id}] zmq error: {e}")
self.req_dict[req_id] = -1
except Exception as e:
llm_logger.error(f"Send result to zmq client failed: {e}")

Expand Down
Loading