Skip to content

Commit a84a98b

Browse files
authored
fix scheduler bug due to async running (#3293)
1 parent c208086 commit a84a98b

File tree

2 files changed

+28
-6
lines changed

2 files changed

+28
-6
lines changed

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l
5656
self.running: list[Request] = []
5757
self.finish_execution_pool = ThreadPoolExecutor(max_workers=1)
5858
self.lock = threading.Lock()
59+
self.to_be_rescheduled_request_id_set = set()
5960

6061
def allocated_slots(self, request: Request):
6162
return len(request.block_tables) * self.config.cache_config.block_size
@@ -76,6 +77,13 @@ def _prepare_decode_task(self, request):
7677

7778
def _prepare_preempt_task(self, request):
7879
return ScheduledPreemptTask(idx=request.idx, request_id=request.request_id)
80+
81+
def reschedule_preempt_task(self, request_id):
82+
with self.lock:
83+
if request_id in self.to_be_rescheduled_request_id_set and request_id in self.requests:
84+
request = self.requests[request_id]
85+
self.waiting.appendleft(request)
86+
self.to_be_rescheduled_request_id_set.remove(request_id)
7987

8088
def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_reqs):
8189
can_schedule = True
@@ -85,7 +93,7 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re
8593
preempted_req.status = RequestStatus.PREEMPTED
8694
preempted_req.num_computed_tokens = 0
8795
self._free_blocks(preempted_req)
88-
self.waiting.appendleft(preempted_req)
96+
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
8997
preempted_reqs.append(preempted_req)
9098
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
9199
if preempted_req == request:
@@ -308,8 +316,9 @@ def get_real_bsz(self) -> int:
308316
return self.real_bsz
309317

310318
def add_request(self, request: Request) -> None:
311-
self.waiting.append(request)
312-
self.requests[request.request_id] = request
319+
with self.lock:
320+
self.waiting.append(request)
321+
self.requests[request.request_id] = request
313322

314323
def _free_blocks(self, request: Request):
315324
self.cache_manager.recycle_gpu_blocks(request.block_tables)
@@ -331,9 +340,15 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]):
331340
if request is None:
332341
# Invalid request ID.
333342
continue
334-
request.status = RequestStatus.FINISHED
335-
self.running.remove(request)
336-
self._free_blocks(request)
343+
if request in self.running: # normally run and finished
344+
self.running.remove(request)
345+
request.status = RequestStatus.FINISHED
346+
self._free_blocks(request)
347+
if request.request_id in self.to_be_rescheduled_request_id_set: # finished after preempted, blocks have been recycled.
348+
self.to_be_rescheduled_request_id_set.remove(request.request_id) # just remove from to_be_rescheduled_request_id_set
349+
if request in self.waiting: # after finished, this request still scheduled from preempted to waiting, unexpected error, should not be here
350+
raise RuntimeError(f"request {request.request_id} scheduled into waiting list, after finished")
351+
337352
self.tasks_list[request.idx] = None
338353
self.stop_flags[request.idx] = True
339354
del self.requests[req_id]

fastdeploy/output/token_processor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,11 @@ def _process_batch_output(self):
432432
tokens = tokens[2 : batch + 2]
433433

434434
batch_result = list()
435+
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
436+
need_to_be_reschedule_req_ids = list(self.resource_manager.to_be_rescheduled_request_id_set)
437+
for request_id in need_to_be_reschedule_req_ids:
438+
if self.resource_manager.requests[request_id].idx >= (batch - 1): # No more token generated for preempted request
439+
self.resource_manager.reschedule_preempt_task(request_id)
435440
for i in range(batch):
436441
if self.resource_manager.stop_flags[i]:
437442
continue
@@ -458,6 +463,8 @@ def _process_batch_output(self):
458463
if recovery_stop:
459464
llm_logger.info(f"recovery stop signal found at task {task_id}")
460465
if not recovery_stop and token_id < 0:
466+
if task_id in self.resource_manager.to_be_rescheduled_request_id_set:
467+
self.resource_manager.reschedule_preempt_task(task_id)
461468
continue
462469

463470
if task.get("prefill_chunk_info", None) is not None:

0 commit comments

Comments
 (0)