Skip to content

Commit bca8905

Browse files
authored
[BugFix] fix control signal release failed (#3390)
* [BugFix] fix control signal release failed * [BugFix] fix control signal release failed * update * update * update
1 parent 8b12c80 commit bca8905

File tree

4 files changed

+58
-48
lines changed

4 files changed

+58
-48
lines changed

fastdeploy/entrypoints/openai/api_server.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,10 @@ async def connection_manager():
175175
await asyncio.wait_for(connection_semaphore.acquire(), timeout=0.001)
176176
yield
177177
except asyncio.TimeoutError:
178-
api_server_logger.info(f"Reach max request release: {connection_semaphore.status()}")
179-
if connection_semaphore.locked():
180-
connection_semaphore.release()
181-
raise HTTPException(status_code=429, detail="Too many requests")
178+
api_server_logger.info(f"Reach max request concurrency, semaphore status: {connection_semaphore.status()}")
179+
raise HTTPException(
180+
status_code=429, detail=f"Too many requests,current max concurrency is {args.max_concurrency}"
181+
)
182182

183183

184184
# TODO 传递真实引擎值 通过pid 获取状态
@@ -265,9 +265,11 @@ async def create_chat_completion(request: ChatCompletionRequest):
265265
inject_to_metadata(request)
266266
generator = await app.state.chat_handler.create_chat_completion(request)
267267
if isinstance(generator, ErrorResponse):
268+
api_server_logger.debug(f"release: {connection_semaphore.status()}")
268269
connection_semaphore.release()
269270
return JSONResponse(content={"detail": generator.model_dump()}, status_code=generator.code)
270271
elif isinstance(generator, ChatCompletionResponse):
272+
api_server_logger.debug(f"release: {connection_semaphore.status()}")
271273
connection_semaphore.release()
272274
return JSONResponse(content=generator.model_dump())
273275
else:

fastdeploy/entrypoints/openai/serving_chat.py

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -78,47 +78,47 @@ async def create_chat_completion(self, request: ChatCompletionRequest):
7878
err_msg = f"Only master node can accept completion request, please send request to master node: {self.pod_ips[0]}"
7979
api_server_logger.error(err_msg)
8080
return ErrorResponse(message=err_msg, code=400)
81-
82-
if request.user is not None:
83-
request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}"
84-
else:
85-
request_id = f"chatcmpl-{uuid.uuid4()}"
86-
api_server_logger.info(f"create chat completion request: {request_id}")
87-
text_after_process = None
8881
try:
89-
current_req_dict = request.to_dict_for_infer(request_id)
90-
if "chat_template" not in current_req_dict:
91-
current_req_dict["chat_template"] = self.chat_template
92-
current_req_dict["arrival_time"] = time.time()
93-
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
94-
text_after_process = current_req_dict.get("text_after_process")
95-
if isinstance(prompt_token_ids, np.ndarray):
96-
prompt_token_ids = prompt_token_ids.tolist()
97-
except Exception as e:
98-
return ErrorResponse(code=400, message=str(e))
99-
100-
del current_req_dict
101-
try:
102-
api_server_logger.debug(f"{self.engine_client.semaphore.status()}")
10382
if self.max_waiting_time < 0:
10483
await self.engine_client.semaphore.acquire()
10584
else:
10685
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
107-
except Exception:
108-
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
86+
api_server_logger.info(f"current {self.engine_client.semaphore.status()}")
10987

110-
if request.stream:
111-
return self.chat_completion_stream_generator(
112-
request, request_id, request.model, prompt_token_ids, text_after_process
113-
)
114-
else:
88+
if request.user is not None:
89+
request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}"
90+
else:
91+
request_id = f"chatcmpl-{uuid.uuid4()}"
92+
api_server_logger.info(f"create chat completion request: {request_id}")
93+
text_after_process = None
11594
try:
116-
return await self.chat_completion_full_generator(
117-
request, request_id, request.model, prompt_token_ids, text_after_process
118-
)
95+
current_req_dict = request.to_dict_for_infer(request_id)
96+
if "chat_template" not in current_req_dict:
97+
current_req_dict["chat_template"] = self.chat_template
98+
current_req_dict["arrival_time"] = time.time()
99+
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
100+
text_after_process = current_req_dict.get("text_after_process")
101+
if isinstance(prompt_token_ids, np.ndarray):
102+
prompt_token_ids = prompt_token_ids.tolist()
119103
except Exception as e:
120104
return ErrorResponse(code=400, message=str(e))
121105

106+
del current_req_dict
107+
108+
if request.stream:
109+
return self.chat_completion_stream_generator(
110+
request, request_id, request.model, prompt_token_ids, text_after_process
111+
)
112+
else:
113+
try:
114+
return await self.chat_completion_full_generator(
115+
request, request_id, request.model, prompt_token_ids, text_after_process
116+
)
117+
except Exception as e:
118+
return ErrorResponse(code=400, message=str(e))
119+
except Exception:
120+
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
121+
122122
def _create_streaming_error_response(self, message: str) -> str:
123123
error_response = ErrorResponse(
124124
code=400,
@@ -254,6 +254,7 @@ async def chat_completion_stream_generator(
254254
logprobs_res = self._create_chat_logprobs(
255255
output_top_logprobs, request.logprobs, request.top_logprobs
256256
)
257+
257258
if self.engine_client.data_processor.tool_parser_obj and not res["finished"]:
258259
tool_delta_message = output["tool_delta_message"]
259260
if tool_delta_message is None:
@@ -277,7 +278,6 @@ async def chat_completion_stream_generator(
277278
logprobs=logprobs_res,
278279
arrival_time=arrival_time,
279280
)
280-
281281
if res["finished"]:
282282
num_choices -= 1
283283
work_process_metrics.e2e_request_latency.observe(
@@ -309,7 +309,6 @@ async def chat_completion_stream_generator(
309309
if len(choices) == max_streaming_response_tokens or res["finished"]:
310310
chunk.choices = choices
311311
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
312-
# 打印尾包
313312
if res["finished"]:
314313
api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}")
315314
choices = []
@@ -417,8 +416,9 @@ async def chat_completion_full_generator(
417416
if task_is_finished:
418417
break
419418
finally:
420-
self.engine_client.semaphore.release()
421419
dealer.close()
420+
self.engine_client.semaphore.release()
421+
api_server_logger.info(f"release {self.engine_client.semaphore.status()}")
422422

423423
choices = []
424424
output = final_res["outputs"]

fastdeploy/entrypoints/openai/serving_completion.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,14 @@ async def create_completion(self, request: CompletionRequest):
101101
api_server_logger.info(f"start inference for request {num_choices}")
102102
prompt_batched_token_ids = []
103103
text_after_process_list = []
104+
try:
105+
if self.max_waiting_time < 0:
106+
await self.engine_client.semaphore.acquire()
107+
else:
108+
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
109+
except Exception:
110+
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
111+
104112
try:
105113
for idx, prompt in enumerate(request_prompts):
106114
request_id_idx = f"{request_id}-{idx}"
@@ -117,14 +125,6 @@ async def create_completion(self, request: CompletionRequest):
117125

118126
del current_req_dict
119127

120-
try:
121-
if self.max_waiting_time < 0:
122-
await self.engine_client.semaphore.acquire()
123-
else:
124-
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
125-
except Exception:
126-
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
127-
128128
if request.stream:
129129
return self.completion_stream_generator(
130130
request=request,

fastdeploy/inter_communicator/zmq_client.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ZmqClient:
3131
"""
3232

3333
def __init__(self, name, mode):
34-
self.context = zmq.Context()
34+
self.context = zmq.Context(4)
3535
self.socket = self.context.socket(mode)
3636
self.file_name = f"/dev/shm/{name}.socket"
3737
self.router_path = f"/dev/shm/router_{name}.ipc"
@@ -67,6 +67,7 @@ def create_router(self):
6767
"""
6868
self.router = self.context.socket(zmq.ROUTER)
6969
self.router.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM)
70+
self.router.setsockopt(zmq.ROUTER_MANDATORY, 1)
7071
self.router.setsockopt(zmq.SNDTIMEO, -1)
7172
self.router.bind(f"ipc://{self.router_path}")
7273

@@ -125,6 +126,11 @@ def send_multipart(self, req_id, data):
125126
else:
126127
break
127128

129+
if self.req_dict[req_id] == -1:
130+
if data[-1].finished:
131+
with self.mutex:
132+
self.req_dict.pop(req_id, None)
133+
return
128134
try:
129135
start_send = time.time()
130136
if self.aggregate_send:
@@ -133,7 +139,9 @@ def send_multipart(self, req_id, data):
133139
result = msgpack.packb([response.to_dict() for response in data])
134140
self.router.send_multipart([self.req_dict[req_id], b"", result])
135141
llm_logger.debug(f"send_multipart result: {req_id} len {len(data)} elapse: {time.time()-start_send}")
136-
142+
except zmq.ZMQError as e:
143+
llm_logger.error(f"[{req_id}] zmq error: {e}")
144+
self.req_dict[req_id] = -1
137145
except Exception as e:
138146
llm_logger.error(f"Send result to zmq client failed: {e}")
139147

0 commit comments

Comments
 (0)