From 0ff228907883466a0349905b59658c2c639fa0f6 Mon Sep 17 00:00:00 2001 From: "U-METAX-TECH\\kgao" Date: Mon, 18 Aug 2025 18:32:44 +0800 Subject: [PATCH] adapt fastdeploy on metax gpu v2 1. adapt for the latest fastdeploy 2. add install docs --- custom_ops/setup_ops.py | 6 + docs/get_started/installation/metax_gpu.md | 83 +++++++++ docs/zh/get_started/installation/metax_gpu.md | 82 +++++++++ fastdeploy/engine/config.py | 2 +- .../metax/attention/flash_attn_backend.py | 4 +- .../moe/fused_moe_triton_metax_backend.py | 7 +- .../backends/metax/moe/triton_moe_kernels.py | 1 - fastdeploy/worker/metax_model_runner.py | 169 +++++++++++------- fastdeploy/worker/metax_worker.py | 162 +++++++++-------- requirements_metaxgpu.txt | 1 + 10 files changed, 368 insertions(+), 149 deletions(-) create mode 100644 docs/get_started/installation/metax_gpu.md create mode 100644 docs/zh/get_started/installation/metax_gpu.md diff --git a/custom_ops/setup_ops.py b/custom_ops/setup_ops.py index a94c22f486..744f564dce 100644 --- a/custom_ops/setup_ops.py +++ b/custom_ops/setup_ops.py @@ -589,6 +589,12 @@ def find_end_files(directory, end_str): if not os.listdir(json_dir): raise ValueError("Git clone nlohmann_json failed!") sources = [ + "gpu_ops/update_inputs_v1.cu", + "gpu_ops/save_with_output_msg.cc", + "gpu_ops/get_output.cc", + "gpu_ops/get_output_msg_with_topk.cc", + "gpu_ops/save_output_msg_with_topk.cc", + "gpu_ops/transfer_output.cc", "gpu_ops/save_with_output.cc", "gpu_ops/set_mask_value.cu", "gpu_ops/set_value_by_flags.cu", diff --git a/docs/get_started/installation/metax_gpu.md b/docs/get_started/installation/metax_gpu.md new file mode 100644 index 0000000000..4cce719261 --- /dev/null +++ b/docs/get_started/installation/metax_gpu.md @@ -0,0 +1,83 @@ +# Metax GPU Installation for running ERNIE 4.5 Series Models + +The following installation methods are available when your environment meets these requirements: +- Python >= 3.10 +- Linux X86_64 + +Before starting, prepare a machine equipped with Enflame S60 accelerator cards. Requirements: + +| Chip Type | Driver Version | KMD Version | +| :---: | :---: | :---: | +| MetaX C550 | 3.0.0.1 | 2.14.6 | + +## 1. Pre-built Docker Installation (Recommended) + +```shell +docker login --username=cr_temp_user --password=eyJpbnN0YW5jZUlkIjoiY3JpLXpxYTIzejI2YTU5M3R3M2QiLCJ0aW1lIjoiMTc1NTUxODEwODAwMCIsInR5cGUiOiJzdWIiLCJ1c2VySWQiOiIyMDcwOTQwMTA1NjYzNDE3OTIifQ:8226ca50ce5476c42062e24d3c465545de1c1780 cr.metax-tech.com && docker pull cr.metax-tech.com/public-library/maca-native:3.0.0.4-ubuntu20.04-amd64 +``` + +## 2. paddlepaddle and custom device installation + +```shell +1)pip install paddlepaddle==3.0.0.dev20250729 -i https://www.paddlepaddle.org.cn/packages/nightly/cpu/ +2)pip install paddle-metax-gpu==3.0.0.dev20250807 -i https://www.paddlepaddle.org.cn/packages/nightly/maca/ +``` + +## 3. Build Wheel from Source +Then clone the source code and build: +```shell +git clone https://github.com/PaddlePaddle/FastDeploy +cd FastDeploy +bash build.sh +``` +The built packages will be in the ```FastDeploy/dist``` directory. + +## 4. Environment Verification + +After installation, verify the environment with this Python code: +```python +import paddle +from paddle.jit.marker import unified +# Verify GPU availability +paddle.utils.run_check() +# Verify FastDeploy custom operators compilation +from fastdeploy.model_executor.ops.gpu import beam_search_softmax +``` + +If the above code executes successfully, the environment is ready. + +## 5. Demo +from fastdeploy import LLM, SamplingParams + +prompts = [ + "Hello. My name is", +] + +sampling_params = SamplingParams(top_p=0.95, max_tokens=32, temperature=0.6) + +llm = LLM(model="/root/model/ERNIE-4.5-21B-A3B-Paddle", tensor_parallel_size=1, max_model_len=256, engine_worker_queue_port=9135, quantization='wint8', static_decode_blocks=0, gpu_memory_utilization=0.9) + +outputs = llm.generate(prompts, sampling_params) + +print(f"Generated {len(outputs)} outputs") +print("=" * 50 + "\n") + +for output in outputs: + prompt = output.prompt + generated_text = output.outputs.text + print(prompt) + print(generated_text) + print("-" * 50) + +Output: +INFO 2025-08-18 10:54:18,455 416822 engine.py[line:202] Waiting worker processes ready... +Loading Weights: 100%|█████████████████████████████████████████████████████████████████████████| 100/100 [03:33<00:00, 2.14s/it] +Loading Layers: 100%|██████████████████████████████████████████████████████████████████████████| 100/100 [00:18<00:00, 5.54it/s] +INFO 2025-08-18 10:58:16,149 416822 engine.py[line:247] Worker processes are launched with 240.08204197883606 seconds. +Processed prompts: 100%|███████████████████████| 1/1 [00:21<00:00, 21.84s/it, est. speed input: 0.00 toks/s, output: 0.00 toks/s] +Generated 1 outputs +================================================== + +Hello. My name is +Alice and I'm here to help you. What can I do for you today? +Hello Alice! I'm trying to organize a small party diff --git a/docs/zh/get_started/installation/metax_gpu.md b/docs/zh/get_started/installation/metax_gpu.md new file mode 100644 index 0000000000..63724684a4 --- /dev/null +++ b/docs/zh/get_started/installation/metax_gpu.md @@ -0,0 +1,82 @@ +# 使用 Metax GPU C550 运行ERNIE 4.5 系列模型 + +FastDeploy在Metax C550上对ERNIE 4.5系列模型进行了深度适配和优化,实现了推理入口和GPU的统一,无需修改即可完成推理任务的迁移。 + +环境准备: +- Python >= 3.10 +- Linux X86_64 + +| Chip Type | Driver Version | KMD Version | +| :---: | :---: | :---: | +| MetaX C550 | 3.0.0.1 | 2.14.6 | + +## 1. 容器镜像获取 + +```shell +docker login --username=cr_temp_user --password=eyJpbnN0YW5jZUlkIjoiY3JpLXpxYTIzejI2YTU5M3R3M2QiLCJ0aW1lIjoiMTc1NTUxODEwODAwMCIsInR5cGUiOiJzdWIiLCJ1c2VySWQiOiIyMDcwOTQwMTA1NjYzNDE3OTIifQ:8226ca50ce5476c42062e24d3c465545de1c1780 cr.metax-tech.com && docker pull cr.metax-tech.com/public-library/maca-native:3.0.0.4-ubuntu20.04-amd64 +``` + +## 2. 预安装 + +```shell +1)pip install paddlepaddle==3.0.0.dev20250729 -i https://www.paddlepaddle.org.cn/packages/nightly/cpu/ +2)pip install paddle-metax-gpu==3.0.0.dev20250807 -i https://www.paddlepaddle.org.cn/packages/nightly/maca/ +``` + +## 3. FastDeploy代码下载并编译 + +```shell +git clone https://github.com/PaddlePaddle/FastDeploy +cd FastDeploy +bash build.sh +``` +The built packages will be in the ```FastDeploy/dist``` directory. + +## 4. 环境验证 + +After installation, verify the environment with this Python code: +```python +import paddle +from paddle.jit.marker import unified +# Verify GPU availability +paddle.utils.run_check() +# Verify FastDeploy custom operators compilation +from fastdeploy.model_executor.ops.gpu import beam_search_softmax +``` +If the above code executes successfully, the environment is ready. + +## 5. 示例 +from fastdeploy import LLM, SamplingParams + +prompts = [ + "Hello. My name is", +] + +sampling_params = SamplingParams(top_p=0.95, max_tokens=32, temperature=0.6) + +llm = LLM(model="/root/model/ERNIE-4.5-21B-A3B-Paddle", tensor_parallel_size=1, max_model_len=256, engine_worker_queue_port=9135, quantization='wint8', static_decode_blocks=0, gpu_memory_utilization=0.9) + +outputs = llm.generate(prompts, sampling_params) + +print(f"Generated {len(outputs)} outputs") +print("=" * 50 + "\n") + +for output in outputs: + prompt = output.prompt + generated_text = output.outputs.text + print(prompt) + print(generated_text) + print("-" * 50) + +输出: +INFO 2025-08-18 10:54:18,455 416822 engine.py[line:202] Waiting worker processes ready... +Loading Weights: 100%|█████████████████████████████████████████████████████████████████████████| 100/100 [03:33<00:00, 2.14s/it] +Loading Layers: 100%|██████████████████████████████████████████████████████████████████████████| 100/100 [00:18<00:00, 5.54it/s] +INFO 2025-08-18 10:58:16,149 416822 engine.py[line:247] Worker processes are launched with 240.08204197883606 seconds. +Processed prompts: 100%|███████████████████████| 1/1 [00:21<00:00, 21.84s/it, est. speed input: 0.00 toks/s, output: 0.00 toks/s] +Generated 1 outputs +================================================== + +Hello. My name is +Alice and I'm here to help you. What can I do for you today? +Hello Alice! I'm trying to organize a small party diff --git a/fastdeploy/engine/config.py b/fastdeploy/engine/config.py index 7b6d1bffb4..ca93bde726 100644 --- a/fastdeploy/engine/config.py +++ b/fastdeploy/engine/config.py @@ -282,7 +282,7 @@ def check(self): f"should be larger than or equal to max_num_seqs: {self.max_num_seqs}" ) assert self.max_num_batched_tokens <= self.max_model_len * self.max_num_seqs, ( - f"max_num_batched_tokens: {self.max_num_batched_tokens} should be larger" + f"max_num_batched_tokens: {self.max_num_batched_tokens} should be less" f"than or equal to max_num_seqs: {self.max_num_seqs} * max_model_len: {self.max_model_len}" ) assert ( diff --git a/fastdeploy/model_executor/layers/backends/metax/attention/flash_attn_backend.py b/fastdeploy/model_executor/layers/backends/metax/attention/flash_attn_backend.py index a67ae76e25..29789fe686 100644 --- a/fastdeploy/model_executor/layers/backends/metax/attention/flash_attn_backend.py +++ b/fastdeploy/model_executor/layers/backends/metax/attention/flash_attn_backend.py @@ -256,7 +256,7 @@ def apply_rope(self, qk, cos, sin): ) out = paddle.add(paddle.multiply(qk, cos), paddle.multiply(rotate_half, sin)) return paddle.cast(out, qk.dtype) - + @paddle.no_grad() def forward_native_backend( self, q: paddle.Tensor, @@ -273,7 +273,7 @@ def forward_native_backend( # 1. 分离 encoder / decoder 的 mask seq_lens_encoder = forward_meta.seq_lens_encoder.squeeze(-1) seq_lens_decoder = forward_meta.seq_lens_decoder.squeeze(-1) - seq_lens_this_time = forward_meta.seq_lens_this_time.squeeze(-1) + seq_lens_this_time = forward_meta.seq_lens_this_time encoder_indices = [] decoder_indices = [] diff --git a/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py b/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py index 50ceecf18f..b457a3ae75 100644 --- a/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py +++ b/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py @@ -44,7 +44,7 @@ def __init__(self, quant_config=None): def process_prequanted_weights(self, layer: nn.Layer, state_dict) -> None: """process_prequanted_weights""" pass - + @paddle.no_grad() def create_weights(self, layer: nn.Layer, state_dict): """ Triton MoE create weight process. @@ -124,12 +124,12 @@ def create_weights(self, layer: nn.Layer, state_dict): ), ) getattr(layer, scale_name).set_value(quanted_weight_scale) - + @paddle.no_grad() def apply( self, layer: nn.Layer, x: paddle.Tensor, - gate_out: paddle.Tensor, + gate: nn.Layer, ) -> paddle.Tensor: """ Triton compute Fused MoE. @@ -141,6 +141,7 @@ def apply( moe_intermediate_size = layer.moe_intermediate_size hidden_size = layer.hidden_size + gate_out = gate(x.cast("float32")) topk_ids, topk_weights = fastdeploy.model_executor.ops.gpu.moe_topk_select( gate_out, layer.gate_correction_bias, diff --git a/fastdeploy/model_executor/layers/backends/metax/moe/triton_moe_kernels.py b/fastdeploy/model_executor/layers/backends/metax/moe/triton_moe_kernels.py index e859e7ce45..87d45e5f07 100644 --- a/fastdeploy/model_executor/layers/backends/metax/moe/triton_moe_kernels.py +++ b/fastdeploy/model_executor/layers/backends/metax/moe/triton_moe_kernels.py @@ -17,7 +17,6 @@ import triton import triton.language as tl - @triton.jit def fused_moe_kernel_paddle( a_ptr, diff --git a/fastdeploy/worker/metax_model_runner.py b/fastdeploy/worker/metax_model_runner.py index d0a820dbd2..95b0cb310d 100644 --- a/fastdeploy/worker/metax_model_runner.py +++ b/fastdeploy/worker/metax_model_runner.py @@ -53,9 +53,7 @@ step_cuda, ) from fastdeploy.platforms import current_platform - -if not current_platform.is_dcu(): - from fastdeploy.spec_decode import MTPProposer, NgramProposer +from fastdeploy.spec_decode import MTPProposer, NgramProposer from fastdeploy import envs from fastdeploy.input.mm_processor import DataProcessor @@ -130,7 +128,7 @@ def __init__( shape=[self.parallel_config.max_num_seqs, 1], fill_value=4, dtype="int64", - ) + ).cpu() self.restore_chunked_prefill_request = dict() # Initialize attention Backend @@ -164,6 +162,7 @@ def _init_speculative_proposer(self): if self.speculative_method == "ngram": self.proposer = NgramProposer(self.fd_config) elif self.speculative_method == "mtp": + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer self.proposer = MTPProposer( self.fd_config, self.get_model(), @@ -193,21 +192,23 @@ def _init_logits_processor(self, request): return self.guided_backend.get_logits_processor(schemata_key=schemata_key), schemata_key - def insert_tasks_v1(self, req_dicts: List[Request]): + def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None): """ Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1 + req_dict: A list of Request dict + num_running_requests: batch_size """ - # NOTE(luotingdan): Lazy initialize kv cache + # Lazy initialize kv cache if "caches" not in self.share_inputs: self.initialize_kv_cache() req_len = len(req_dicts) has_prefill_task = False + has_decode_task = False for i in range(req_len): request = req_dicts[i] idx = request.idx if request.task_type.value == RequestType.PREFILL.value: # prefill task - logger.debug(f"Handle prefill request {request} at idx {idx}") prefill_start_index = request.prefill_start_index prefill_end_index = request.prefill_end_index length = prefill_end_index - prefill_start_index @@ -253,6 +254,11 @@ def insert_tasks_v1(self, req_dicts: List[Request]): ) input_ids = request.prompt_token_ids + request.output_token_ids + logger.debug( + f"Handle prefill request {request} at idx {idx}, " + f"{prefill_start_index=}, {prefill_end_index=}, " + f"need_prefilled_token_num={len(input_ids)}" + ) self.share_inputs["input_ids"][idx : idx + 1, :length] = np.array( input_ids[prefill_start_index:prefill_end_index] ) @@ -264,7 +270,7 @@ def insert_tasks_v1(self, req_dicts: List[Request]): ) self.share_inputs["stop_flags"][idx : idx + 1] = False self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length + self.seq_lens_this_time_buffer[idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids) @@ -281,22 +287,27 @@ def insert_tasks_v1(self, req_dicts: List[Request]): self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array( request.block_tables, dtype="int32" ) + if self.share_inputs["is_block_step"][idx]: # has tasks to continue to decode + has_decode_task = True continue else: # preempted task logger.debug(f"Handle preempted request {request} at idx {idx}") self.share_inputs["block_tables"][idx : idx + 1, :] = -1 self.share_inputs["stop_flags"][idx : idx + 1] = True - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 0 + self.seq_lens_this_time_buffer[idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["is_block_step"][idx : idx + 1] = False continue - if len(request.eos_token_ids) < self.parallel_config.eos_tokens_lens: - request.eos_token_ids.append(request.eos_token_ids[0]) + assert len(request.eos_token_ids) == self.model_config.eos_tokens_lens self.share_inputs["eos_token_id"][:] = np.array(request.eos_token_ids, dtype="int64").reshape(-1, 1) self.share_inputs["top_p"][idx : idx + 1] = request.get("top_p", 0.7) + self.share_inputs["top_k"][idx : idx + 1] = request.get("top_k", 0) + self.share_inputs["top_k_list"][idx] = request.get("top_k", 0) + self.share_inputs["min_p"][idx : idx + 1] = request.get("min_p", 0.0) + self.share_inputs["min_p_list"][idx] = request.get("min_p", 0.0) self.share_inputs["temperature"][idx : idx + 1] = request.get("temperature", 0.95) self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0) self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0) @@ -326,12 +337,15 @@ def insert_tasks_v1(self, req_dicts: List[Request]): else: self.share_inputs["stop_seqs_len"][idx : idx + 1, :] = 0 - if has_prefill_task: + if has_prefill_task or has_decode_task: self.share_inputs["not_need_stop"][0] = True + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests] - def insert_prefill_inputs(self, req_dicts: List[Request]): + def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None): """ Process inputs for prefill tasks and insert it to share_inputs buffer + req_dict: A list of Request dict + num_running_requests: batch_size TODO(gongshaotian): Refactor this func """ @@ -365,7 +379,7 @@ def insert_prefill_inputs(self, req_dicts: List[Request]): self.share_inputs["prompt_ids"][idx : idx + 1, :length] = np.array(request.prompt_token_ids) self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1 + self.seq_lens_this_time_buffer[idx : idx + 1] = 1 self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length @@ -377,7 +391,7 @@ def insert_prefill_inputs(self, req_dicts: List[Request]): request.draft_token_ids[0:num_prefill_send_token], dtype="int64", ) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token + self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token else: self.share_inputs["pre_ids"][idx : idx + 1] = -1 self.share_inputs["step_idx"][idx : idx + 1] = 0 @@ -412,7 +426,7 @@ def insert_prefill_inputs(self, req_dicts: List[Request]): ) self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size + self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = token_chunk_size self.share_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size self.share_inputs["prompt_lens"][idx : idx + 1] = token_chunk_size @@ -430,7 +444,7 @@ def insert_prefill_inputs(self, req_dicts: List[Request]): else: self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length + self.seq_lens_this_time_buffer[idx : idx + 1] = length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length @@ -453,12 +467,13 @@ def get_attr_from_request(request, attr, default_value=None): else: return default_value - if len(request.eos_token_ids) < self.parallel_config.eos_tokens_lens: - request.eos_token_ids.append(request.eos_token_ids[0]) + assert len(request.eos_token_ids) == self.model_config.eos_tokens_lens self.share_inputs["eos_token_id"][:] = np.array(request.eos_token_ids, dtype="int64").reshape(-1, 1) self.share_inputs["top_p"][idx : idx + 1] = get_attr_from_request(request, "top_p", 0.7) self.share_inputs["top_k"][idx : idx + 1] = request.get("top_k", 0) + self.share_inputs["top_k_list"][idx] = request.get("top_k", 0) self.share_inputs["min_p"][idx : idx + 1] = request.get("min_p", 0.0) + self.share_inputs["min_p_list"][idx] = request.get("min_p", 0.0) self.share_inputs["temperature"][idx : idx + 1] = get_attr_from_request(request, "temperature", 0.95) self.share_inputs["penalty_score"][idx : idx + 1] = get_attr_from_request( @@ -489,13 +504,15 @@ def get_attr_from_request(request, attr, default_value=None): request.block_tables, dtype="int32" ) - if request.get("bad_words_token_ids") is not None: + if request.get("bad_words_token_ids") is not None and len(request.get("bad_words_token_ids")) > 0: bad_words_len = len(request.get("bad_words_token_ids")) - if bad_words_len > 0: - self.share_inputs["bad_tokens_len"][idx : idx + 1] = bad_words_len - self.share_inputs["bad_tokens"][idx : idx + 1, :bad_words_len] = np.array( - request.get("bad_words_token_ids"), dtype="int64" - ) + self.share_inputs["bad_tokens_len"][idx : idx + 1] = bad_words_len + self.share_inputs["bad_tokens"][idx : idx + 1, :bad_words_len] = np.array( + request.get("bad_words_token_ids"), dtype="int64" + ) + else: + self.share_inputs["bad_tokens_len"][idx : idx + 1] = 1 + self.share_inputs["bad_tokens"][idx : idx + 1, :] = np.array([-1], dtype="int64") if request.get("stop_token_ids") is not None and request.get("stop_seqs_len") is not None: stop_seqs_num = len(request.get("stop_seqs_len")) @@ -514,46 +531,56 @@ def get_attr_from_request(request, attr, default_value=None): self.share_inputs["not_need_stop"][0] = True + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests] + if self.speculative_method in ["mtp"]: - self.proposer.insert_prefill_inputs(req_dicts) + self.proposer.insert_prefill_inputs(req_dicts, num_running_requests) def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int): - """Set dummy prefill inputs to share_inputs""" - # NOTE(gongshaotian): The maximum decoding length is equal to the expected decoded tokens plus the eos token - max_dec_len = expected_decode_len + 1 - full_length = min( - num_tokens // batch_size, - self.parallel_config.max_model_len - max_dec_len, - ) - input_length = int(full_length * self.cache_config.kv_cache_ratio) - block_num = ( - input_length + self.cache_config.block_size - 1 - ) // self.cache_config.block_size + self.cache_config.enc_dec_block_num - - for i in range(batch_size): - idx = i - self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) - self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) - self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length - self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length - self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length - self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 - self.share_inputs["prompt_lens"][idx : idx + 1] = 0 - self.share_inputs["step_idx"][idx : idx + 1] = 0 - self.share_inputs["max_dec_len"][idx : idx + 1] = max_dec_len - self.share_inputs["min_dec_len"][idx : idx + 1] = max_dec_len - self.share_inputs["stop_flags"][idx : idx + 1] = False - self.share_inputs["temperature"][idx : idx + 1] = 1 + """Set dummy prefill inputs to share_inputs""" + # NOTE(gongshaotian): The maximum decoding length is equal to the expected decoded tokens plus the eos token + max_dec_len = expected_decode_len + 1 + full_length = min( + num_tokens // batch_size, + self.parallel_config.max_model_len - max_dec_len, + ) - self.share_inputs["first_token_ids"][idx : idx + 1] = self.share_inputs["input_ids"][idx : idx + 1, :1] - self.share_inputs["ori_seq_lens_encoder"][idx : idx + 1] = input_length + # When the full length is too large, DeepEP's buffer size will not be enough to cause the result to appear nan. + # Figure out the accurate buffer size of DeepEP. + if self.fd_config.parallel_config.enable_expert_parallel: + full_length = min(full_length, 32) + + input_length = int(full_length * self.cache_config.kv_cache_ratio) + block_num = ( + input_length + self.cache_config.block_size - 1 + ) // self.cache_config.block_size + self.cache_config.enc_dec_block_num + + for i in range(batch_size): + idx = i + self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) + self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) + self.share_inputs["eos_token_id"][:] = np.array( + [2] * self.model_config.eos_tokens_lens, dtype="int64" + ).reshape(-1, 1) + self.seq_lens_this_time_buffer[idx : idx + 1] = input_length + self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length + self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length + self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 + self.share_inputs["prompt_lens"][idx : idx + 1] = 0 + self.share_inputs["step_idx"][idx : idx + 1] = 0 + self.share_inputs["max_dec_len"][idx : idx + 1] = max_dec_len + self.share_inputs["min_dec_len"][idx : idx + 1] = max_dec_len + self.share_inputs["stop_flags"][idx : idx + 1] = False + self.share_inputs["temperature"][idx : idx + 1] = 1 - self.share_inputs["encoder_block_lens"][idx : idx + 1] = block_num - self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( - idx * block_num, (idx + 1) * block_num, 1 - ) + self.share_inputs["first_token_ids"][idx : idx + 1] = self.share_inputs["input_ids"][idx : idx + 1, :1] + self.share_inputs["ori_seq_lens_encoder"][idx : idx + 1] = input_length + self.share_inputs["encoder_block_lens"][idx : idx + 1] = block_num + self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( + idx * block_num, (idx + 1) * block_num, 1 + ) + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer def _init_share_inputs(self, max_num_seqs: int): """ Initialize all share buffers for model inputs. @@ -568,18 +595,20 @@ def _init_share_inputs(self, max_num_seqs: int): ) self.share_inputs["input_ids"] = paddle.full( [max_num_seqs, self.parallel_config.max_model_len], - self.parallel_config.pad_token_id, + self.model_config.pad_token_id, dtype="int64", ) self.share_inputs["prompt_ids"] = paddle.full( [max_num_seqs, self.parallel_config.max_model_len], - self.parallel_config.pad_token_id, + self.model_config.pad_token_id, dtype="int64", ) - self.share_inputs["eos_token_id"] = paddle.full([self.parallel_config.eos_tokens_lens, 1], 0, dtype="int64") + self.share_inputs["eos_token_id"] = paddle.full([self.model_config.eos_tokens_lens, 1], 0, dtype="int64") self.share_inputs["top_p"] = paddle.full([max_num_seqs, 1], self.model_config.top_p, dtype="float32") self.share_inputs["top_k"] = paddle.full([max_num_seqs, 1], 0, dtype="int64") + self.share_inputs["top_k_list"] = [0] * max_num_seqs self.share_inputs["min_p"] = paddle.full([max_num_seqs, 1], 0.0, dtype="float32") + self.share_inputs["min_p_list"] = [0.0] * max_num_seqs self.share_inputs["temperature"] = paddle.full( [max_num_seqs, 1], self.model_config.temperature, dtype="float32" ) @@ -603,7 +632,7 @@ def _init_share_inputs(self, max_num_seqs: int): self.share_inputs["max_length"] = paddle.full( [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" ) - self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32") + self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32") self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") @@ -626,7 +655,7 @@ def _init_share_inputs(self, max_num_seqs: int): self.share_inputs["need_block_list"] = paddle.full([max_num_seqs], -1, dtype="int32") self.share_inputs["need_block_len"] = paddle.full([1], 0, dtype="int32") self.share_inputs["used_list_len"] = paddle.full([max_num_seqs], 0, dtype="int32") - self.share_inputs["infer_seed"] = paddle.full([max_num_seqs, 1], 0, dtype="int64") + self.share_inputs["infer_seed"] = paddle.full([max_num_seqs, 1], 0, dtype="int64").cpu() self.share_inputs["first_token_ids"] = paddle.full([max_num_seqs, 1], -1, dtype="int64") self.share_inputs["ori_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["system_lens"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") @@ -795,7 +824,10 @@ def _prepare_inputs(self) -> None: temperature=self.share_inputs["temperature"], top_p=self.share_inputs["top_p"], top_k=self.share_inputs["top_k"], + top_k_list=self.share_inputs["top_k_list"], min_p=self.share_inputs["min_p"], + min_p_list=self.share_inputs["min_p_list"], + seed=self.share_inputs["infer_seed"], step_idx=self.share_inputs["step_idx"], pre_token_ids=self.share_inputs["pre_ids"], prompt_ids=self.share_inputs["prompt_ids"], @@ -933,7 +965,7 @@ def initialize_kv_cache(self, profile: bool = False) -> None: self.share_inputs["caches"] = list(cache_kvs.values()) for value in cache_kvs.values(): del value - paddle.device.cuda.empty_cache() + #paddle.device.empty_cache() def initialize_attn_backend(self) -> None: """ @@ -1247,6 +1279,7 @@ def _get_skip_idx(self, model_forward_batch: Optional[List[Request]] = None): def execute_model( self, model_forward_batch: Optional[List[Request]] = None, + num_running_requests: int = None, ) -> Optional[ModelRunnerOutput]: """ The Entrance of model execute. @@ -1255,6 +1288,7 @@ def execute_model( class at the server level, which is too granular for ModelRunner. We plan to replace it with 'ModelForwardBatch'. intermediate_tensors: + num_running_requests: batch_size """ # 1. Prepare inputs of model and sampler. skip_idx_list = self._get_skip_idx(model_forward_batch) @@ -1397,6 +1431,9 @@ class at the server level, which is too granular for ModelRunner. self._update_chunked_prefill(model_forward_batch) self._add_cache(model_forward_batch) + self.seq_lens_this_time_buffer[:num_running_requests].copy_( + self.share_inputs["seq_lens_this_time"][:num_running_requests], False + ) return None def _add_cache(self, model_forward_batch) -> None: @@ -1528,7 +1565,7 @@ def clear_parameters(self, pid): """ " Dynamic model loader use to clear parameters use for RL""" self.dynamic_weight_manager.clear_parameters(pid) self.clear_cache() - paddle.device.cuda.empty_cache() + #paddle.device.empty_cache() self.dynamic_weight_manager._log_memory("dynamic weight manager clear all memory") def update_parameters(self, pid): diff --git a/fastdeploy/worker/metax_worker.py b/fastdeploy/worker/metax_worker.py index ddf36580c7..2eb5210a55 100644 --- a/fastdeploy/worker/metax_worker.py +++ b/fastdeploy/worker/metax_worker.py @@ -21,11 +21,15 @@ import paddle from paddle import nn +import pymxsml from fastdeploy import envs from fastdeploy.config import FDConfig from fastdeploy.engine.request import Request -from fastdeploy.utils import get_logger +from fastdeploy.platforms import current_platform +from fastdeploy.plugins.model_runner import load_model_runner_plugins +from fastdeploy.utils import get_logger, set_random_seed +from fastdeploy.worker.model_runner_base import ModelRunnerBase from fastdeploy.worker.metax_model_runner import MetaxModelRunner from fastdeploy.worker.output import ModelRunnerOutput from fastdeploy.worker.worker_base import WorkerBase @@ -60,7 +64,7 @@ def init_device(self): paddle.set_default_dtype(self.parallel_config.dtype) gc.collect() - paddle.device.cuda.empty_cache() + else: raise RuntimeError(f"Not support device type: {self.device_config.device}") @@ -92,77 +96,75 @@ def determine_available_memory(self) -> int: You may limit the usage of GPU memory by adjusting the `gpu_memory_utilization` parameter. """ - """Will implement later""" - - # 1. Record memory state before profile run - start_time = time.perf_counter() - Gb = 1024**3 - - local_rank = self.local_rank % self.max_chips_per_node - paddle.device.cuda.reset_max_memory_reserved(local_rank) - paddle.device.cuda.reset_max_memory_allocated(local_rank) - # max memory for Allocator - paddle_reserved_mem_before_run = paddle.device.cuda.max_memory_reserved(local_rank) - # max memory for Tensor - paddle_allocated_mem_before_run = paddle.device.cuda.max_memory_allocated(local_rank) # not reserved - - device_id = int(self.device_ids[local_rank]) - if os.getenv("MACA_VISIBLE_DEVICES") is not None: - device_id = int(os.getenv("MACA_VISIBLE_DEVICES").split(",")[device_id]) - - import pymxsml - - pymxsml.mxSmlInit() - info = pymxsml.mxSmlGetMemoryInfo(device_id) - before_run_meminfo_total = info.vramTotal * 1024 - before_run_meminfo_used = info.vramUse * 1024 - before_run_meminfo_free = before_run_meminfo_total - before_run_meminfo_used - - logger.info("Before running the profile, the memory usage info of Metax GPU is as follows:") - logger.info(f"Device Index: {device_id}") - logger.info(f"Device Total memory: {before_run_meminfo_total / Gb}") - logger.info(f"Device used memory: {before_run_meminfo_used / Gb}") - logger.info(f"Device free memory: {before_run_meminfo_free / Gb}") - logger.info(f"Paddle reserved memory: {paddle_reserved_mem_before_run / Gb}") - logger.info(f"Paddle allocated memory: {paddle_allocated_mem_before_run / Gb}") - - # 2. Profile run - self.model_runner.profile_run() - - # 3. Statistical memory information - paddle_reserved_mem_after_run = paddle.device.cuda.max_memory_reserved(local_rank) - paddle_allocated_mem_after_run = paddle.device.cuda.max_memory_allocated(local_rank) - - model_block_memory_used = self.cal_theortical_kvcache() - paddle_peak_increase = paddle_reserved_mem_after_run - paddle_allocated_mem_before_run - - paddle.device.cuda.empty_cache() - - info = pymxsml.mxSmlGetMemoryInfo(device_id) - after_run_meminfo_total = info.vramTotal * 1024 - after_run_meminfo_used = info.vramUse * 1024 - after_run_meminfo_free = after_run_meminfo_total - after_run_meminfo_used - - available_kv_cache_memory = ( - after_run_meminfo_total * self.cache_config.gpu_memory_utilization - - after_run_meminfo_used - - paddle_peak_increase - ) - available_kv_cache_memory += model_block_memory_used * self.parallel_config.total_block_num - end_time = time.perf_counter() + # temporary fix kvcache size to test + fd_kvache_mem = os.getenv("FD_KVCACHE_MEM") + if fd_kvache_mem is not None: + return int(float(fd_kvache_mem) * 1024**3) + else: + # 1. Record memory state before profile run + start_time = time.perf_counter() + Gb = 1024**3 + + local_rank = self.local_rank % self.max_chips_per_node + paddle.device.cuda.reset_max_memory_reserved(local_rank) + paddle.device.cuda.reset_max_memory_allocated(local_rank) + # max memory for Allocator + paddle_reserved_mem_before_run = paddle.device.cuda.max_memory_reserved(local_rank) + # max memory for Tensor + paddle_allocated_mem_before_run = paddle.device.cuda.max_memory_allocated(local_rank) # not reserved + + device_id = int(self.device_ids[local_rank]) + if os.getenv("MACA_VISIBLE_DEVICES") is not None: + device_id = int(os.getenv("MACA_VISIBLE_DEVICES").split(",")[device_id]) + + pymxsml.mxSmlInit() + info = pymxsml.mxSmlGetMemoryInfo(device_id) + before_run_meminfo_total = info.vramTotal * 1024 + before_run_meminfo_used = info.vramUse * 1024 + before_run_meminfo_free = before_run_meminfo_total - before_run_meminfo_used + + logger.info("Before running the profile, the memory usage info of Metax GPU is as follows:") + logger.info(f"Device Index: {device_id}") + logger.info(f"Device Total memory: {before_run_meminfo_total / Gb}") + logger.info(f"Device used memory: {before_run_meminfo_used / Gb}") + logger.info(f"Device free memory: {before_run_meminfo_free / Gb}") + logger.info(f"Paddle reserved memory: {paddle_reserved_mem_before_run / Gb}") + logger.info(f"Paddle allocated memory: {paddle_allocated_mem_before_run / Gb}") + + # 2. Profile run + self.model_runner.profile_run() + + # 3. Statistical memory information + paddle_reserved_mem_after_run = paddle.device.cuda.max_memory_reserved(local_rank) + paddle_allocated_mem_after_run = paddle.device.cuda.max_memory_allocated(local_rank) + + model_block_memory_used = self.cal_theortical_kvcache() + paddle_peak_increase = paddle_reserved_mem_after_run - paddle_allocated_mem_before_run + + paddle.device.cuda.empty_cache() + + info = pymxsml.mxSmlGetMemoryInfo(device_id) + after_run_meminfo_total = info.vramTotal * 1024 + after_run_meminfo_used = info.vramUse * 1024 + after_run_meminfo_free = after_run_meminfo_total - after_run_meminfo_used + + available_kv_cache_memory = ((after_run_meminfo_free - paddle_peak_increase )* self.cache_config.gpu_memory_utilization) + available_kv_cache_memory += model_block_memory_used * self.parallel_config.total_block_num + + end_time = time.perf_counter() - logger.info("After running the profile, the memory usage info of Metax GPU is as follows:") - logger.info(f"Device Index: {device_id}") - logger.info(f"Device Total memory: {after_run_meminfo_total / Gb}") - logger.info(f"Device used memory: {after_run_meminfo_used / Gb}") - logger.info(f"Device free memory: {after_run_meminfo_free / Gb}") - logger.info(f"Paddle reserved memory: {paddle_reserved_mem_after_run / Gb}") - logger.info(f"Paddle allocated memory: {paddle_allocated_mem_after_run / Gb}") - logger.info(f"Paddle available_kv_cache_memory: {available_kv_cache_memory / Gb}") - logger.info(f"Profile time: {end_time - start_time}") + logger.info("After running the profile, the memory usage info of Metax GPU is as follows:") + logger.info(f"Device Index: {device_id}") + logger.info(f"Device Total memory: {after_run_meminfo_total / Gb}") + logger.info(f"Device used memory: {after_run_meminfo_used / Gb}") + logger.info(f"Device free memory: {after_run_meminfo_free / Gb}") + logger.info(f"Paddle reserved memory: {paddle_reserved_mem_after_run / Gb}") + logger.info(f"Paddle allocated memory: {paddle_allocated_mem_after_run / Gb}") + logger.info(f"Paddle available_kv_cache_memory: {available_kv_cache_memory / Gb}") + logger.info(f"Profile time: {end_time - start_time}") - return available_kv_cache_memory + return available_kv_cache_memory def load_model(self) -> None: """Load model""" @@ -180,24 +182,32 @@ def initialize_cache(self, num_gpu_blocks: int) -> None: def execute_model( self, model_forward_batch: Optional[List[Request]] = None, + num_running_request: int = None, ) -> Optional[ModelRunnerOutput]: """ """ - output = self.model_runner.execute_model(model_forward_batch) + output = self.model_runner.execute_model(model_forward_batch, num_running_request) return output - def preprocess_new_task(self, req_dicts: List[Request]) -> None: + def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None: """Process new requests and then start the decode loop and workers and modelrunners should not perceive it. """ if envs.ENABLE_V1_KVCACHE_SCHEDULER: - self.model_runner.insert_tasks_v1(req_dicts=req_dicts) + self.model_runner.insert_tasks_v1(req_dicts=req_dicts, num_running_requests=num_running_requests) else: - self.model_runner.insert_prefill_inputs(req_dicts=req_dicts) - + self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests) + def graph_optimize_and_warm_up_model(self) -> None: + """ + Perform the warm-up and the graph optimization + """ + if self.model_runner.graph_opt_level >= 1: + self.model_runner.sot_warmup() + # Todo Triger cuda grpah capture. + def check_health(self) -> bool: """ """ return True def cal_theortical_kvcache(self) -> int: """Calculate the block memory required""" - return self.model_runner.cal_theortical_kvcache() + return self.model_runner.cal_theortical_kvcache() \ No newline at end of file diff --git a/requirements_metaxgpu.txt b/requirements_metaxgpu.txt index 305f9825fe..7aa310fa23 100644 --- a/requirements_metaxgpu.txt +++ b/requirements_metaxgpu.txt @@ -37,3 +37,4 @@ opentelemetry-instrumentation-mysql opentelemetry-distro  opentelemetry-exporter-otlp opentelemetry-instrumentation-fastapi +partial_json_parser