Skip to content

add error traceback info #3419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion fastdeploy/cache_manager/cache_messager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import math
import threading
import time
import traceback

import numpy as np
import paddle
Expand Down Expand Up @@ -309,4 +310,4 @@ def _prefill_layerwise_send_cache_thread(self):
self.last_layer_idx = prefilled_layer_idx

except Exception as e:
logger.error(f"prefill layerwise send cache thread has exception: {e}")
logger.error(f"prefill layerwise send cache thread has exception: {e}, {str(traceback.format_exc())}")
3 changes: 2 additions & 1 deletion fastdeploy/cache_manager/cache_transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import json
import queue
import time
import traceback

import numpy as np
import paddle
Expand Down Expand Up @@ -342,7 +343,7 @@ def do_data_transfer(self):
if self.rank == 0:
self.cache_task_queue.barrier3.reset()
except Exception as e:
logger.info(f"do_data_transfer: error: {e}")
logger.info(f"do_data_transfer: error: {e}, {str(traceback.format_exc())}")

def _transfer_data(
self,
Expand Down
13 changes: 7 additions & 6 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
import threading
import time
import traceback
import uuid
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
Expand Down Expand Up @@ -469,7 +470,7 @@ def update_cache_blocks(self, task, block_size):
self.leaf_req_map[leaf_node].add(req_id)
self.cache_info[req_id] = (leaf_node, input_ids)
except Exception as e:
logger.error(f"update_cache_blocks, error: {type(e)} {e}")
logger.error(f"update_cache_blocks, error: {type(e)} {e}, {str(traceback.format_exc())}")
raise e

def request_match_blocks(self, task, block_size, *args):
Expand Down Expand Up @@ -555,7 +556,7 @@ def request_match_blocks(self, task, block_size, *args):
)
return common_block_ids, matched_token_num, hit_info
except Exception as e:
logger.error(f"request_block_ids: error: {type(e)} {e}")
logger.error(f"request_block_ids: error: {type(e)} {e}, {str(traceback.format_exc())}")
raise e

def request_block_ids(self, task, block_size, dec_token_num, *args):
Expand Down Expand Up @@ -660,7 +661,7 @@ def request_block_ids(self, task, block_size, dec_token_num, *args):
)
return common_block_ids, unique_block_ids, hit_info
except Exception as e:
logger.error(f"request_block_ids: error: {type(e)} {e}")
logger.error(f"request_block_ids: error: {type(e)} {e}, {str(traceback.format_exc())}")
raise e

def release_block_ids_async(self, task):
Expand Down Expand Up @@ -709,7 +710,7 @@ def release_block_ids(self, task):
)
return
except Exception as e:
logger.error(f"release_block_ids: error: {type(e)} {e}")
logger.error(f"release_block_ids: error: {type(e)} {e}, {str(traceback.format_exc())}")
raise e

def _handle_free_gpu_node_without_cpu(self, node):
Expand Down Expand Up @@ -899,7 +900,7 @@ def free_block_ids_async(self, need_block_num):
else:
self.gpu_free_task_future = None
except Exception as e:
logger.error(f"free_block_ids_async: error: {type(e)} {e}")
logger.error(f"free_block_ids_async: error: {type(e)} {e}, {str(traceback.format_exc())}")
raise e

def free_cpu_block_ids(self, need_block_num):
Expand Down Expand Up @@ -1218,5 +1219,5 @@ def recv_data_transfer_result(self):
+ f"task_cpu_block_id {task_cpu_block_id} event_type {event_type} done"
)
except Exception as e:
logger.warning(f"recv_data_transfer_result: error: {e}")
logger.warning(f"recv_data_transfer_result: error: {e}, {str(traceback.format_exc())}")
raise e
10 changes: 6 additions & 4 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ def receiver_loop():
time.sleep(0.001)

except Exception as e:
llm_logger.error(f"Error in main loop: {e}")
llm_logger.error(f"Error in main loop: {e}, {str(traceback.format_exc())}")
time.sleep(0.1)

threading.Thread(target=receiver_loop, daemon=True).start()
Expand Down Expand Up @@ -985,7 +985,9 @@ def _exit_sub_services(self):
try:
os.killpg(p.pid, signal.SIGTERM)
except Exception as e:
print(f"Error extracting file: {e}")
console_logger.error(
f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}"
)
self.worker_ready_signal.clear()
self.exist_task_signal.clear()
self.exist_swapped_task_signal.clear()
Expand All @@ -998,7 +1000,7 @@ def _exit_sub_services(self):
try:
os.killpg(self.worker_proc.pid, signal.SIGTERM)
except Exception as e:
print(f"Error extracting sub services: {e}")
console_logger.error(f"Error extracting sub services: {e}, {str(traceback.format_exc())}")

self.engine_worker_queue.cleanup()
if hasattr(self, "zmq_server") and self.zmq_server is not None:
Expand Down Expand Up @@ -1173,7 +1175,7 @@ def generate(self, prompts, stream):
try:
req_id = self._format_and_add_data(prompts)
except Exception as e:
llm_logger.error(f"Error happend while adding request, details={e}")
llm_logger.error(f"Error happend while adding request, details={e}, {str(traceback.format_exc())}")
raise EngineError(str(e), error_code=400)

# Get the result of the current request
Expand Down
4 changes: 2 additions & 2 deletions fastdeploy/engine/expert_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def receiver_loop():
time.sleep(0.001)
continue
except Exception as e:
llm_logger.error(f"get decode tasks error: {e}")
llm_logger.error(f"get decode tasks error: {e}, {str(traceback.format_exc())}")

threading.Thread(target=receiver_loop, daemon=True).start()

Expand Down Expand Up @@ -378,4 +378,4 @@ def start_expert_service(cfg, local_data_parallel_id, ipc_signal_suffix):
expert_service.start(ipc_signal_suffix, local_data_parallel_id)
expert_service.split_connector.start_receiver()
except Exception as e:
llm_logger.exception(f"Expert service failed to start: {e}")
llm_logger.exception(f"Expert service failed to start: {e}, {str(traceback.format_exc())}")
5 changes: 3 additions & 2 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import threading
import time
import traceback
from collections import deque
from collections.abc import Iterable
from concurrent.futures import ThreadPoolExecutor
Expand Down Expand Up @@ -389,7 +390,7 @@ def get_prefix_cached_blocks(self, request: Request):
request.cache_prepare_time = time.time() - cache_prepare_time
return True
except Exception as e:
llm_logger.error(f"prefix match blocks error: {e}, waiting reschedule...")
llm_logger.error(f"prefix match blocks error: {e}, {str(traceback.format_exc())} waiting reschedule...")
return False

def add_request(self, request: Request) -> None:
Expand Down Expand Up @@ -441,4 +442,4 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]):
self.stop_flags[request.idx] = True
del self.requests[req_id]
except Exception as e:
llm_logger.error(e)
llm_logger.error(f"finish_request err: {e}, {str(traceback.format_exc())}")
3 changes: 2 additions & 1 deletion fastdeploy/entrypoints/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""

import json
import traceback

import uvicorn
from fastapi import FastAPI
Expand Down Expand Up @@ -114,7 +115,7 @@ def launch_api_server(args) -> None:
log_level="info",
) # set log level to error to avoid log
except Exception as e:
api_server_logger.error(f"launch sync http server error, {e}")
api_server_logger.error(f"launch sync http server error, {e}, {str(traceback.format_exc())}")


def main():
Expand Down
5 changes: 3 additions & 2 deletions fastdeploy/entrypoints/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""

import time
import traceback
import uuid

import numpy as np
Expand Down Expand Up @@ -141,7 +142,7 @@ def add_requests(self, task):
work_process_metrics.prompt_tokens_total.inc(input_ids_len)
work_process_metrics.request_prompt_tokens.observe(input_ids_len)
except Exception as e:
api_server_logger.error(e)
api_server_logger.error(f"add_requests error: {e}, {str(traceback.format_exc())}")
raise EngineError(str(e), error_code=400)

if input_ids_len + min_tokens >= self.max_model_len:
Expand Down Expand Up @@ -194,7 +195,7 @@ def add_requests(self, task):
else:
self.zmq_client.send_pyobj(task)
except Exception as e:
api_server_logger.error(e)
api_server_logger.error(f"zmq_client send task error: {e}, {str(traceback.format_exc())}")
raise EngineError(str(e), error_code=400)

def vaild_parameters(self, data):
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/entrypoints/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def _build_sample_logprobs(self, logprobs_lists: LogprobsLists, topk_logprobs: i
return result

except Exception as e:
llm_logger.error(f"Error building sample logprobs from LogprobsLists: {e}")
llm_logger.error(f"Error building sample logprobs from LogprobsLists: {e}, {str(traceback.format_exc())}")

def _run_engine(self, req_ids: list[str], use_tqdm: bool, topk_logprobs: Optional[int] = None):
"""
Expand Down
5 changes: 3 additions & 2 deletions fastdeploy/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import threading
import time
import traceback
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from multiprocessing import current_process
Expand Down Expand Up @@ -159,7 +160,7 @@ async def lifespan(app: FastAPI):
multiprocess.mark_process_dead(os.getpid())
api_server_logger.info(f"Closing metrics client pid: {pid}")
except Exception as e:
api_server_logger.warning(e)
api_server_logger.warning(f"exit error: {e}, {str(traceback.format_exc())}")


app = FastAPI(lifespan=lifespan)
Expand Down Expand Up @@ -355,7 +356,7 @@ def launch_api_server() -> None:
log_level="info",
) # set log level to error to avoid log
except Exception as e:
api_server_logger.error(f"launch sync http server error, {e}")
api_server_logger.error(f"launch sync http server error, {e}, {str(traceback.format_exc())}")


metrics_app = FastAPI()
Expand Down
26 changes: 19 additions & 7 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ async def create_chat_completion(self, request: ChatCompletionRequest):
if isinstance(prompt_token_ids, np.ndarray):
prompt_token_ids = prompt_token_ids.tolist()
except Exception as e:
return ErrorResponse(code=400, message=str(e))
error_msg = f"request[{request_id}] generator error: {str(e)}, {str(traceback.format_exc())}"
api_server_logger.error(error_msg)
return ErrorResponse(code=400, message=error_msg)

del current_req_dict

Expand All @@ -115,11 +117,19 @@ async def create_chat_completion(self, request: ChatCompletionRequest):
request, request_id, request.model, prompt_token_ids, text_after_process
)
except Exception as e:
return ErrorResponse(code=400, message=str(e))
except Exception:
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
error_msg = f"request[{request_id}]full generator error: {str(e)}, {str(traceback.format_exc())}"
api_server_logger.error(error_msg)
return ErrorResponse(code=408, message=error_msg)
except Exception as e:
error_msg = (
f"request[{request_id}] waiting error: {str(e)}, {str(traceback.format_exc())}, "
f"max waiting time: {self.max_waiting_time}"
)
api_server_logger.error(error_msg)
return ErrorResponse(code=408, message=error_msg)

def _create_streaming_error_response(self, message: str) -> str:
api_server_logger.error(message)
error_response = ErrorResponse(
code=400,
message=message,
Expand Down Expand Up @@ -336,7 +346,9 @@ async def chat_completion_stream_generator(
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"

except Exception as e:
error_data = self._create_streaming_error_response(str(e))
error_data = self._create_streaming_error_response(
f"request[{request_id}] generate stream error: {str(e)}, {str(traceback.format_exc())}"
)
yield f"data: {error_data}\n\n"
finally:
dealer.close()
Expand Down Expand Up @@ -556,6 +568,6 @@ def _build_logprobs_response(
return LogProbs(content=[sampled_entry])

except Exception as e:
api_server_logger.error("Error in _build_logprobs_response: %s", e)
api_server_logger.error(traceback.format_exc())
error_msg = f"Error in _build_logprobs_response: {e}, {str(traceback.format_exc())}"
api_server_logger.error(error_msg)
return None
29 changes: 23 additions & 6 deletions fastdeploy/entrypoints/openai/serving_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import asyncio
import time
import traceback
import uuid
from typing import List, Optional

Expand Down Expand Up @@ -92,7 +93,9 @@ async def create_completion(self, request: CompletionRequest):
else:
raise ValueError("Prompt must be a string, a list of strings or a list of integers.")
except Exception as e:
return ErrorResponse(message=str(e), code=400)
error_msg = f"OpenAIServingCompletion create_completion: {e}, {str(traceback.format_exc())}"
api_server_logger.error(error_msg)
return ErrorResponse(message=error_msg, code=400)

if request_prompt_ids is not None:
request_prompts = request_prompt_ids
Expand All @@ -106,8 +109,13 @@ async def create_completion(self, request: CompletionRequest):
await self.engine_client.semaphore.acquire()
else:
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
except Exception:
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
except Exception as e:
error_msg = (
f"OpenAIServingCompletion waiting error: {e}, {str(traceback.format_exc())}, "
f"max waiting time: {self.max_waiting_time}"
)
api_server_logger.error(error_msg)
return ErrorResponse(code=408, message=error_msg)

try:
for idx, prompt in enumerate(request_prompts):
Expand All @@ -121,6 +129,8 @@ async def create_completion(self, request: CompletionRequest):
text_after_process_list.append(current_req_dict.get("text_after_process"))
prompt_batched_token_ids.append(prompt_token_ids)
except Exception as e:
error_msg = f"OpenAIServingCompletion format error: {e}, {str(traceback.format_exc())}"
api_server_logger.error(error_msg)
return ErrorResponse(message=str(e), code=400)

del current_req_dict
Expand All @@ -147,10 +157,16 @@ async def create_completion(self, request: CompletionRequest):
text_after_process_list=text_after_process_list,
)
except Exception as e:
return ErrorResponse(code=400, message=str(e))
error_msg = (
f"OpenAIServingCompletion completion_full_generator error: {e}, {str(traceback.format_exc())}"
)
api_server_logger.error(error_msg)
return ErrorResponse(code=400, message=error_msg)

except Exception as e:
return ErrorResponse(message=str(e), code=400)
error_msg = f"OpenAIServingCompletion create_completion error: {e}, {str(traceback.format_exc())}"
api_server_logger.error(error_msg)
return ErrorResponse(message=error_msg, code=400)

async def completion_full_generator(
self,
Expand Down Expand Up @@ -431,6 +447,7 @@ async def completion_stream_generator(
choices = []

except Exception as e:
api_server_logger.error(f"Error in completion_stream_generator: {e}, {str(traceback.format_exc())}")
yield f"data: {ErrorResponse(message=str(e), code=400).model_dump_json(exclude_unset=True)}\n\n"
finally:
del request
Expand Down Expand Up @@ -614,5 +631,5 @@ def _build_logprobs_response(
)

except Exception as e:
api_server_logger.error("Error in _build_logprobs_response: %s", e)
api_server_logger.error(f"Error in _build_logprobs_response: {str(e)}, {str(traceback.format_exc())}")
return None
Loading
Loading