From bda8ad278ee112f6b07d478f4ffad5bdb18f5515 Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Mon, 14 Jul 2025 13:45:58 +0800 Subject: [PATCH 01/10] Support mixed ep --- fastdeploy/config.py | 22 +- fastdeploy/model_executor/layers/moe/ep.py | 49 +- .../layers/moe/fused_moe_backend_base.py | 29 +- fastdeploy/worker/gpu_model_runner.py | 12 +- fastdeploy/worker/vl_gpu_model_runner.py | 1256 +++++++++++++++++ fastdeploy/worker/worker_process.py | 9 + 6 files changed, 1347 insertions(+), 30 deletions(-) create mode 100644 fastdeploy/worker/vl_gpu_model_runner.py diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 89efeee6f3..6b763dcde2 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -30,13 +30,23 @@ logger = get_logger("config", "config.log") -class MoEPhase(Enum): +class MoEPhase: """ The generation phase of the moe. """ + def __init__(self, phase="prefill"): + self._phase = phase - PREFILL = 1 - DECODER = 2 + @property + def phase(self): + return self._phase + + @phase.setter + def phase(self, value): + if value not in ["prefill", "decode"]: + raise ValueError(f"The moe_phase is invalid, only support prefill and decode, but got {value}") + else: + self._phase = value class ErnieArchitectures: @@ -210,11 +220,11 @@ def __init__( setattr(self, key, value) self.use_ep = args["expert_parallel_size"] > 1 if self.splitwise_role == "mixed": - self.moe_phase = MoEPhase.PREFILL + self.moe_phase = MoEPhase(phase="prefill") elif self.splitwise_role == "prefill": - self.moe_phase = MoEPhase.PREFILL + self.moe_phase = MoEPhase(phase="prefill") elif self.splitwise_role == "decode": - self.moe_phase = MoEPhase.DECODER + self.moe_phase = MoEPhase(phase="decode") else: raise NotImplementedError diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index d7463a0f96..a387f5680e 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -43,7 +43,6 @@ def __init__( num_max_dispatch_tokens_per_rank: int, hidden: int, num_experts: int, - moe_phase: MoEPhase, ep_size: int, ep_rank: int, async_finish: bool = False, @@ -65,10 +64,10 @@ def __init__( self.hidden = hidden self.num_experts = num_experts self.num_local_experts = num_experts // ep_size - self.moe_phase = moe_phase self.async_finish = async_finish - self.deepep_engine = None + self.prefill_deepep_engine = None + self.decode_deepep_engine = None if moe_phase == MoEPhase.DECODER: logger.info("Initializing Low Latency Buffer") @@ -105,14 +104,14 @@ def get_low_latency_buffer(self): ) # Allocate a buffer if not existed or not enough buffer size if ( - self.deepep_engine is None - or self.deepep_engine.group != self.group - or not self.deepep_engine.low_latency_mode - or self.deepep_engine.num_rdma_bytes < num_rdma_bytes + self.decode_deepep_engine is None + or self.decode_deepep_engine.group != self.group + or not self.decode_deepep_engine.low_latency_mode + or self.decode_deepep_engine.num_rdma_bytes < num_rdma_bytes ): # NOTES: for best performance, the QP number **must** be equal to the number of the local experts assert self.num_experts % self.ep_size == 0 - self.deepep_engine = deep_ep.Buffer( + self.decode_deepep_engine = deep_ep.Buffer( self.group, 0, num_rdma_bytes, @@ -149,7 +148,7 @@ def low_latency_dispatch( handle, _, dispatch_hook, - ) = self.deepep_engine.low_latency_dispatch( + ) = self.decode_deepep_engine.low_latency_dispatch( hidden_states, topk_idx, expertwise_scale, @@ -175,7 +174,7 @@ def low_latency_combine( combined_hidden_states: [num_tokens, hidden] """ - combined_hidden_states, _, combine_hook = self.deepep_engine.low_latency_combine( + combined_hidden_states, _, combine_hook = self.decode_deepep_engine.low_latency_combine( hidden_states, topk_idx, topk_weights, @@ -189,7 +188,7 @@ def clean_low_latency_buffer(self): """ clean_low_latency_buffer """ - self.deepep_engine.clean_low_latency_buffer( + self.decode_deepep_engine.clean_low_latency_buffer( self.num_max_dispatch_tokens_per_rank, self.hidden, self.num_experts ) @@ -197,7 +196,8 @@ def barrier_all(self): """ barrier_all """ - self.deepep_engine.barrier_all() + self.prefill_deepep_engine.barrier_all() + self.decode_deepep_engine.barrier_all() class EPRunner: @@ -210,7 +210,6 @@ def __init__( top_k: int, hidden: int, num_experts: int, - moe_phase: MoEPhase, num_max_dispatch_tokens_per_rank: int = 1, ep_size: int = 1, ep_rank: int = 0, @@ -294,7 +293,7 @@ def __init__( top_k, hidden, num_experts, - MoEPhase.PREFILL, + num_max_dispatch_tokens_per_rank=256, ep_size=ep_size, ep_rank=ep_rank, redundant_experts_num=redundant_experts_num, @@ -314,7 +313,7 @@ def dispatch( num_tokens_per_expert, is_token_in_rank, _, - ) = self.ep_engine.deepep_engine.get_dispatch_layout(topk_idx, self.num_experts) + ) = self.ep_engine.prefill_deepep_engine.get_dispatch_layout(topk_idx, self.num_experts) x_scale_tensor = kwargs.get("x_scale_tensor", None) dispatch_args = { @@ -327,7 +326,7 @@ def dispatch( "topk_idx": topk_idx, "topk_weights": topk_weights, } - return self.ep_engine.deepep_engine.dispatch(**dispatch_args) + return self.ep_engine.prefill_deepep_engine.dispatch(**dispatch_args) def combine( self, @@ -342,7 +341,7 @@ def combine( "async_finish": self.ep_engine.async_finish, "topk_weights": recv_topk_weights, } - fused_moe_out, _, _ = self.ep_engine.deepep_engine.combine(**combine_args) + fused_moe_out, _, _ = self.ep_engine.prefill_deepep_engine.combine(**combine_args) return fused_moe_out @@ -393,6 +392,22 @@ def dispatch( return recv_hidden_states, recv_expert_count, handle def combine(self, ffn_out, topk_idx, topk_weights, handle): + # TODO(@wufeisheng): Delete them when deepep in PaddlePaddle is fixed + ( + src_info, + layout_range, + num_max_dispatch_tokens_per_rank, + num_experts, + ) = handle + + handle = ( + src_info, + layout_range, + num_max_dispatch_tokens_per_rank, + None, + num_experts, + ) + print(f'ffn_out shape: {ffn_out.shape}, num_ranks: {8}, num_max_dispatch_tokens_per_rank: {num_max_dispatch_tokens_per_rank}') combined_hidden_states, combine_hook = self.ep_engine.low_latency_combine( ffn_out, topk_idx, topk_weights, handle ) diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index ad46d00c0e..0de6d1f13c 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -45,9 +45,11 @@ def init_ep(self, layer: nn.Layer) -> None: Init EP related module """ if layer.ep_size > 1: - if layer.fd_config.parallel_config.moe_phase == MoEPhase.DECODER: - from .ep import EPDecoderRunner - + if layer.fd_config.parallel_config.splitwise_role == "mixed": + from .ep import EPPrefillRunner, EPDecoderRunner + self.ep_prefill_runner = EPPrefillRunner( + layer.top_k, layer.hidden_size, layer.num_experts, + layer.ep_size, layer.ep_rank) self.ep_decoder_runner = EPDecoderRunner( layer.top_k, layer.hidden_size, @@ -58,7 +60,17 @@ def init_ep(self, layer: nn.Layer) -> None: layer.fd_config.model_config.redundant_experts_num, ) else: - from .ep import EPPrefillRunner + if layer.fd_config.parallel_config.moe_phase == "prefill": + from .ep import EPPrefillRunner + self.ep_prefill_runner = EPPrefillRunner( + layer.top_k, layer.hidden_size, layer.num_experts, + layer.ep_size, layer.ep_rank) + else: + from .ep import EPDecoderRunner + self.ep_decoder_runner = EPDecoderRunner( + layer.top_k, layer.hidden_size, layer.num_experts, + layer.moe_config.num_max_dispatch_tokens_per_rank, + layer.ep_size, layer.ep_rank) self.ep_prefill_runner = EPPrefillRunner( layer.top_k, @@ -141,9 +153,14 @@ def apply( Paddle Cutlass compute Fused MoE. """ if layer.ep_size > 1: - if layer.fd_config.parallel_config.moe_phase == MoEPhase.PREFILL: + if layer.fd_config.parallel_config.moe_phase.phase == "prefill": + print("Apply ep prefill") return self.apply_ep_prefill(layer, x, gate_out) - else: + elif layer.fd_config.parallel_config.moe_phase.phase == "decode": + print("Apply ep decode") return self.apply_ep_decode(layer, x, gate_out) + else: + logger.error( + f"invalid value of moe_phase={layer.fd_config.parallel_config.moe_phase.phase}") else: return self.apply_tp(layer, x, gate_out) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index ecdc4bf37e..776f96faeb 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -23,7 +23,7 @@ from paddle import nn from paddleformers.utils.log import logger -from fastdeploy.config import FDConfig +from fastdeploy.config import FDConfig, MoEPhase from fastdeploy.engine.request import Request, RequestType from fastdeploy.model_executor.graph_optimization.utils import ( profile_run_guard, @@ -1164,7 +1164,17 @@ class at the server level, which is too granular for ModelRunner. intermediate_tensors: """ # NOTE(wufeisheng): For Expert Parallelism + is_decode_batch = paddle.to_tensor(not ((self.share_inputs["seq_lens_this_time"] + > 1).sum() > 0)) + paddle.distributed.broadcast(is_decode_batch, src=0) + self.fd_config.parallel_config.moe_phase.phase = "decode" if is_decode_batch else "prefill" + print(f'rank: {self.rank}, is decode batch: {is_decode_batch}, seq_lens_encoder: {self.share_inputs["seq_lens_encoder"]}') + + # NOTE(wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state. + # This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode, + # when there is data on other runner, the current runner is required to execute part of the model. if not self.not_need_stop(): + print("got into empty_input") self._execute_empty_input() return None diff --git a/fastdeploy/worker/vl_gpu_model_runner.py b/fastdeploy/worker/vl_gpu_model_runner.py new file mode 100644 index 0000000000..43592441a0 --- /dev/null +++ b/fastdeploy/worker/vl_gpu_model_runner.py @@ -0,0 +1,1256 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License" +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +import argparse +import json +import os +import random +from typing import Optional + +import numpy as np +import paddle +import paddle.distributed.fleet as fleet +from paddleformers.transformers.model_utils import load_tp_checkpoint +from safetensors import safe_open + +from fastdeploy.config import (DeviceConfig, FDConfig, GraphOptimizationConfig, + KVCacheConfig, LoadConfig, ModelConfig, + MoEConfig, MoEPhase, ParallelConfig, + SpeculativeConfig) +from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer +from fastdeploy.input.mm_processor import DataProcessor +from fastdeploy.model_executor.layers.attention import get_attention_backend +from fastdeploy.model_executor.layers.rotary_embedding import get_rope_3d +from fastdeploy.model_executor.layers.sample.meta_data import SamplingMetadata +from fastdeploy.model_executor.layers.sample.sampler import Sampler +from fastdeploy.model_executor.models.ernie4_5_moe import \ + Ernie4_5_PretrainedModel +from fastdeploy.model_executor.models.ernie4_5_vl.configuration import \ + Ernie4_5_VLMoeConfig +from fastdeploy.model_executor.models.ernie4_5_vl.dfnrope import \ + DFNRopeVisionTransformerConfig +from fastdeploy.model_executor.models.ernie4_5_vl.dfnrope.modeling import \ + DFNRopeVisionTransformerPretrainedModel +from fastdeploy.model_executor.models.ernie4_5_vl.modeling_resampler import ( + ScatterOp, VariableResolutionResamplerModel) +from fastdeploy.platforms import current_platform +from fastdeploy.worker.forward_meta import ForwardMeta +from fastdeploy.worker.utils import check_safetensors_model +from fastdeploy.worker.vl_model_runner_base import VLModelRunnerBase + +if current_platform.is_cuda() and current_platform.available(): + from fastdeploy.model_executor.layers.utils import ( + remove_padding, speculate_remove_padding) + +from fastdeploy.model_executor.ops.gpu import (save_output, + set_stop_value_multi_ends, + set_value_by_flags_and_idx, + update_inputs) + + +class GPUVLModelRunner(VLModelRunnerBase): + """ + The GPUVLModelRunner class for vision-language tasks on GPU. + """ + + def __init__( + self, + config: ModelConfig, + args: argparse.Namespace, + nranks: int, + rank: int, + ) -> None: + """ + GPUVLModelRunner init + """ + self.nranks = nranks + self.rank = rank + + hcg = fleet.get_hybrid_communicate_group() + self.tensor_parallel_degree = max(hcg.get_model_parallel_world_size(), + 1) + self.tensor_parallel_rank = hcg.get_model_parallel_rank() + self.mp_src_rank = hcg.get_model_parallel_group_src_rank() + self.mp_group = hcg.get_model_parallel_group() + self.is_safetensors_model = check_safetensors_model( + args.model_name_or_path) + + model_path = os.path.dirname(args.model_name_or_path) + args.llm_model_name_or_path = args.model_name_or_path + if not self.is_safetensors_model: + args.tokenizer = args.image_preprocessor = model_path + else: + args.tokenizer = args.image_preprocessor = args.model_name_or_path + args.vision_model_name_or_path = os.path.join( + model_path, "DFNRopeVisionTransformer") + + self.amp_black = [ + "reduce_sum", + "c_softmax_with_cross_entropy", + "elementwise_div", + "sin", + "cos", + "sort", + "multinomial", + ] + self.amp_white = [ + "lookup_table", + "lookup_table_v2", + "flash_attn", + "matmul", + "matmul_v2", + "fused_gemm_epilogue", + ] + + super().__init__(config, args) + self.init_extra_input(config, args) + + self._reset_paddle_env() + + self.sampler = Sampler() + + def _reset_paddle_env(self): + pass + + def update_chunked_prefill(self, tasks: list[any]) -> None: + """ + update chunked prefill + """ + if not self.args.enable_chunked_prefill: + return + + for task in tasks: + if task.chunk_idx > len(task.prefill_chunk_info): + continue + + idx = task.idx + if task.chunk_idx == len(task.prefill_chunk_info): + self.share_inputs["seq_lens_this_time"][idx:idx + 1] = 1 + self.share_inputs['seq_lens_encoder'][idx:idx + 1] = 0 + self.share_inputs["seq_lens_decoder"][idx:idx + + 1] = task.start_idx + self.share_inputs["step_idx"][idx:idx + 1] = 1 + else: + inputs = self._preprocess_task( + task.prefill_chunk_info[task.chunk_idx]) + if inputs.get("images") is not None: + self.share_inputs[ + "image_features"] = self.extract_vision_features( + inputs) + else: + # Compatible with the situation that lacks images and videos + self.share_inputs["image_features"] = None + + token_chunk_size = inputs["input_ids"].shape[1] + self.share_inputs["input_ids"][ + idx:idx + 1, :token_chunk_size] = inputs["input_ids"] + self.share_inputs["seq_lens_this_time"][idx:idx + + 1] = token_chunk_size + self.share_inputs['seq_lens_encoder'][idx:idx + + 1] = token_chunk_size + self.share_inputs["seq_lens_decoder"][idx:idx + + 1] = task.start_idx + self.share_inputs["step_idx"][idx:idx + 1] = 0 + + task.start_idx += token_chunk_size + task.chunk_idx += 1 + + def _load_model( + self, + model_name: str, + dynamic_load_weight: int = 0, + ) -> None: + """ + Load the model from the given model name. + """ + + vocab_file_names = [ + "tokenizer.model", "spm.model", "ernie_token_100k.model" + ] + for i in range(len(vocab_file_names)): + if os.path.exists( + os.path.join(self.args.tokenizer, vocab_file_names[i])): + ErnieBotTokenizer.resource_files_names[ + "vocab_file"] = vocab_file_names[i] + break + + tokenizer = ErnieBotTokenizer.from_pretrained( + self.args.tokenizer, + model_max_length=self.args.max_model_len, + padding_side="right", + use_fast=False, + ) + tokenizer.ignored_index = -100 + if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.unk_token + + config = Ernie4_5_VLMoeConfig.from_pretrained( + self.args.llm_model_name_or_path, + tensor_parallel_degree=self.tensor_parallel_degree, + tensor_parallel_rank=self.tensor_parallel_rank, + moe_group="dummy", + ) + self.model_cfg = config + if self.is_safetensors_model: + meta_json = os.path.join(self.args.model_name_or_path, + "model.safetensors.index.json") + if os.path.exists(meta_json): + with open( + os.path.join(self.args.model_name_or_path, + "model.safetensors.index.json"), + "r") as f: + self.weight_map = json.load(f)["weight_map"] + else: + self.weight_map = {} + with safe_open(os.path.join(self.args.model_name_or_path, + "model.safetensors"), + framework="np") as f: + keys = f.keys() + for k in keys: + self.weight_map[k] = "model.safetensors" + + if self.is_safetensors_model: + vision_config = config.vision_config + vision_config.tensor_parallel_degree = self.tensor_parallel_degree + vision_config.tensor_parallel_rank = self.tensor_parallel_rank + vision_config.attn_sep = False + vision_config.dtype = "bfloat16" + else: + vision_config = DFNRopeVisionTransformerConfig.from_pretrained( + self.args.vision_model_name_or_path, + tensor_parallel_degree=self.tensor_parallel_degree, + tensor_parallel_rank=self.tensor_parallel_rank, + attn_sep=False, + dtype="bfloat16", + ) + config.vision_config = vision_config + self.vision_config = vision_config + config.pixel_hidden_size = config.vision_config.hidden_size + config.im_patch_id = tokenizer.get_vocab()["<|IMAGE_PLACEHOLDER|>"] + config.think_end_id = tokenizer.get_vocab()[""] + config.max_text_id = config.im_patch_id + + config.sequence_parallel = False + + self.dtype = self.args.dtype + paddle.set_default_dtype(self.dtype) + + self.vision_model, self.resampler_model = self.inject_pp_vision_model( + self.args, config) + + processor = DataProcessor( + tokenizer_name=self.args.tokenizer, + image_preprocessor_name=str(self.args.image_preprocessor), + ) + processor.eval() + image_preprocess = processor.image_preprocessor + image_preprocess.image_mean_tensor = paddle.to_tensor( + image_preprocess.image_mean, dtype="float32").reshape([1, 3, 1, 1]) + image_preprocess.image_std_tensor = paddle.to_tensor( + image_preprocess.image_std, dtype="float32").reshape([1, 3, 1, 1]) + image_preprocess.rescale_factor = paddle.to_tensor( + image_preprocess.rescale_factor, dtype="float32") + image_preprocess.image_mean_tensor = image_preprocess.image_mean_tensor.squeeze( + [-2, -1]).repeat_interleave(config.vision_config.patch_size**2 * 1, + -1) + image_preprocess.image_std_tensor = image_preprocess.image_std_tensor.squeeze( + [-2, -1]).repeat_interleave(config.vision_config.patch_size**2 * 1, + -1) + self.image_preprocess = image_preprocess + + graph_opt_config = GraphOptimizationConfig( + self.args.enable_static_graph_inference, self.args.use_cudagraph, + self.args.max_capture_batch_size) + + fd_config, self.model = build_stream_line_model( + self.args.model_name_or_path, + self.args.dtype, + self.args.block_size, + max_model_len=self.args.max_model_len, + tokenizer=tokenizer, + quantization=self.args.quantization, + graph_opt_config=graph_opt_config, + ) + self.model.eval() + self.set_state_dict(self.args) + + fd_config.parallel_config.max_model_len = fd_config.model_config.max_seq_len + self.fd_config = fd_config + attn_backend_cls = get_attention_backend() + num_heads = self.fd_config.model_config.num_attention_heads // \ + self.fd_config.parallel_config.tensor_parallel_degree + self.fd_config.model_config.kv_num_heads = int( + self.fd_config.model_config.num_key_value_heads + ) // self.fd_config.parallel_config.tensor_parallel_degree + head_dim = self.fd_config.model_config.head_dim + self.attn_backend = attn_backend_cls( + self.fd_config, + kv_num_heads=self.fd_config.model_config.kv_num_heads, + num_heads=num_heads, + head_dim=head_dim) + self._init_kvcache() + + def init_extra_input(self, config: ModelConfig, args: argparse.Namespace) -> None: + """ + Initialize extra input tensors. + """ + head_dim = self.model_cfg.head_dim + self.share_inputs.update({ + "rope_emb": + paddle.full(shape=[ + args.max_num_seqs, 2, 1, self.max_length, 1, head_dim // 2 + ], + fill_value=0, + dtype="float32") + }) + self.share_inputs.update({"image_features": None}) + self.share_inputs.update({ + "need_think_end": + paddle.full(shape=[args.max_num_seqs, 1], + fill_value=0, + dtype="int32") + }) + self.share_inputs.update({ + "enable_thinking": + paddle.full(shape=[1], fill_value=True, dtype="bool") + }) + self.share_inputs.update({ + "reasoning_index": + paddle.full(shape=[args.max_num_seqs, 1], + fill_value=0, + dtype="int32") + }) + + def init_rotary_position_embedding(self, max_model_len: int) -> None: + """ + Init rotary position embedding + """ + pass + + def _init_kvcache(self): + """ + Init kv cache + """ + cache_kvs = {} + total_block_num = self.num_gpu_blocks + num_layers = self.model_cfg.get("num_layers", + None) or self.model_cfg.get( + "num_hidden_layers", None) + + kv_num_head = self.model_cfg.get( + "num_key_value_heads", + self.model_cfg.num_attention_heads, + ) + kv_num_head = kv_num_head // self.tensor_parallel_degree + self.model_cfg.kv_num_head = kv_num_head + + for i in range(num_layers): + cache_type = self.args.dtype + cache_kvs["key_caches_{}".format(i)] = paddle.full( + shape=[ + total_block_num, + kv_num_head, + self.args.block_size, + self.model_cfg.head_dim, + ], + fill_value=0, + dtype=cache_type, + ) + cache_kvs["value_caches_{}".format(i)] = paddle.full( + shape=[ + total_block_num, + kv_num_head, + self.args.block_size, + self.model_cfg.head_dim, + ], + fill_value=0, + dtype=cache_type, + ) + + self.share_inputs["caches"] = list(cache_kvs.values()) + for value in cache_kvs.values(): + del value + paddle.device.cuda.empty_cache() + + def clear_parameters(self, pid: int) -> None: + """ clear_parameters """ + if "caches" in self.share_inputs: + self.model.clear_parameters(pid) + del self.share_inputs["caches"] + paddle.device.cuda.empty_cache() + self.model.log_memory_usage("clear all memory") + + def update_parameters(self, pid: int) -> None: + """ update_parameters """ + if "caches" not in self.share_inputs: + self.model.update_parameters(pid) + self._init_kvcache() + self.model.log_memory_usage("update all memory") + + @paddle.no_grad() + def set_state_dict(self, args: argparse.Namespace) -> None: + """set_state_dict""" + if not self.is_safetensors_model: + rank_model_paths = [] + for root, dirs, files in os.walk(self.args.llm_model_name_or_path): + for file in files: + if file == f"model_state.tp0{self.tensor_parallel_rank}.pdparams": + rank_model_paths.append(os.path.join(root, file)) + elif file == "model_state.pdparams": + rank_model_paths.append(os.path.join(root, file)) + state_dict = {} + for path in rank_model_paths: + loaded_dict = paddle.load(path, return_numpy=True) + state_dict.update(loaded_dict) + + resampler_state = {} + for key in list(state_dict.keys()): + if "vision" in key: + state_dict.pop(key) + if key.startswith("ernie.resampler_model."): + value = state_dict.pop(key) + value = paddle.to_tensor(value).cast("bfloat16") + value = value.numpy() + resampler_state[ + key[len("ernie.resampler_model."):]] = value + elif key.startswith("resampler_model."): + value = state_dict.pop(key) + value = paddle.to_tensor(value).cast("bfloat16") + value = value.numpy() + resampler_state[key[len("resampler_model."):]] = value + self.model.set_state_dict(state_dict) + self.resampler_model.set_state_dict(resampler_state) + else: + state_dict = load_tp_checkpoint( + args.model_name_or_path, + Ernie4_5_PretrainedModel, + self.model_cfg, + return_numpy=True, + ) + for key in list(state_dict.keys()): + if key.startswith("vision_model.") or key.startswith( + "ernie.resampler_model."): + state_dict.pop(key) + self.model.set_state_dict(state_dict) + + @paddle.no_grad() + def vit_load( + self, + model_path: str, + tensor_parallel_degree: int, + tensor_parallel_rank: int, + ) -> None: + """ + Load vit tp weight + """ + if tensor_parallel_degree == 1: + rank_model_path = os.path.join(model_path, "model_state.pdparams") + else: + rank_model_path = os.path.join( + model_path, f"model_state_tp0{tensor_parallel_rank}.pdparams") + if os.path.exists(rank_model_path): + return paddle.load(rank_model_path, return_numpy=True) + else: + raise ValueError(f"No such a file {rank_model_path}") + + @paddle.no_grad() + def inject_pp_vision_model(self, args: argparse.Namespace, cfg: Ernie4_5_VLMoeConfig): + """ + Inject pp vision model + """ + + def set_vision_state_dict(model, + tensor_parallel_degree: int=8, + tensor_parallel_rank: int=0, + name: str=""): + """ + Set vision model weight + """ + model_state_dict = model.state_dict() + compat_keys = [name + k for k in model_state_dict.keys()] + model_files = set() + for k in compat_keys: + if k in self.weight_map.keys(): + model_files.add( + os.path.join(args.model_name_or_path, + self.weight_map[k])) + state_dict = {} + for model_file in model_files: + with safe_open(model_file, framework="np") as f: + for k in f.keys(): + if k in compat_keys: + new_k = k.replace(name, "") + tensor = f.get_tensor(k) + if tensor_parallel_degree > 1: + if "resampler_model" in name and new_k == "spatial_linear.0.weight": + tensor = np.split( + tensor, tensor_parallel_degree, + axis=0)[tensor_parallel_rank] + elif name == "vision_model.": + if "attn.proj.weight" in new_k or "fc2.weight" in new_k: + tensor = np.split( + tensor, + tensor_parallel_degree, + axis=0)[tensor_parallel_rank] + elif "fc1.weight" in new_k or "fc1.bias" in new_k: + tensor = np.split( + tensor, + tensor_parallel_degree, + axis=-1)[tensor_parallel_rank] + elif "qkv.weight" in new_k: + head_dim = self.vision_config.hidden_size // self.vision_config.num_heads + tensor = tensor.reshape([ + self.vision_config.hidden_size, 3, + self.vision_config.num_heads, + head_dim + ]) + tensor = np.split( + tensor, + tensor_parallel_degree, + axis=-2 + )[tensor_parallel_rank].reshape([ + self.vision_config.hidden_size, -1 + ]) + elif "qkv.bias" in new_k: + head_dim = self.vision_config.hidden_size // self.vision_config.num_heads + tensor = tensor.reshape([ + 3, self.vision_config.num_heads, + head_dim + ]) + tensor = np.split( + tensor, + tensor_parallel_degree, + axis=-2 + )[tensor_parallel_rank].reshape([-1]) + state_dict[new_k] = tensor + model.set_state_dict(state_dict) + + vision_model = DFNRopeVisionTransformerPretrainedModel( + cfg.vision_config) + vision_model = paddle.amp.decorate(models=vision_model, + level="O2", + dtype="bfloat16") + vision_model.eval() + if not self.is_safetensors_model: + vit_state_dict = self.vit_load(args.vision_model_name_or_path, + self.tensor_parallel_degree, + self.tensor_parallel_rank) + vision_model.set_state_dict(vit_state_dict) + else: + set_vision_state_dict( + vision_model, + tensor_parallel_degree=self.tensor_parallel_degree, + tensor_parallel_rank=self.tensor_parallel_rank, + name="vision_model.", + ) + + resampler_model = VariableResolutionResamplerModel( + cfg.pixel_hidden_size, + cfg.hidden_size, + cfg.spatial_conv_size, + cfg.temporal_conv_size, + config=cfg, + ) + resampler_model = paddle.amp.decorate(models=resampler_model, + level="O2", + dtype="bfloat16") + resampler_model.eval() + if self.is_safetensors_model: + is_ernie_begin = False + for k in self.weight_map.keys(): + if k.startswith("ernie.resampler_model."): + is_ernie_begin = True + set_vision_state_dict( + resampler_model, + tensor_parallel_degree=self.tensor_parallel_degree, + tensor_parallel_rank=self.tensor_parallel_rank, + name="ernie.resampler_model." + if is_ernie_begin else "resampler_model.", + ) + return vision_model, resampler_model + + @paddle.no_grad() + def extract_vision_features(self, inputs: list[paddle.Tensor]) -> paddle.Tensor: + """extract_vision_features""" + assert inputs["images"] is not None + grid_thw = inputs["grid_thw"] + + images = inputs["images"].cast("float32") + images = self.image_preprocess.rescale_factor * images - self.image_preprocess.image_mean_tensor + images = images / self.image_preprocess.image_std_tensor + images = images.cast("bfloat16") + + token_type_ids = inputs["token_type_ids"] + token_type_ids_w_video = token_type_ids + input_ids = inputs["input_ids"] + # convert to img patch id + image_mask = input_ids == self.model_cfg.im_patch_id + image_type_ids = inputs["image_type_ids"] + with paddle.amp.auto_cast( + True, + custom_black_list=self.amp_black, + custom_white_list=self.amp_white, + level="O2", + dtype=self.dtype, + ): + image_features = self.vision_model.extract_feature( + images, grid_thw) + if self.tensor_parallel_degree > 1: + S, C = image_features.shape + image_features = image_features.reshape( + [-1, C * self.model_cfg.spatial_conv_size**2]) + image_features = ScatterOp.apply(image_features, + axis=-1) # mp 切 Fea + image_features = image_features.reshape([S, -1]) + image_features = self.resampler_model( + image_features, + image_mask, + token_type_ids_w_video, + image_type_ids, + grid_thw, + ) + return image_features + + @paddle.no_grad() + def prepare_rope3d(self, position_ids: paddle.Tensor, **kwargs) -> paddle.Tensor: + """prepare_rope3d""" + + prefix_max_position_ids = paddle.max(position_ids) + 1 + dec_pos_ids = paddle.tile( + paddle.arange(kwargs["max_length"], + dtype="int64").unsqueeze(0).unsqueeze(-1), [1, 1, 3]) + dec_pos_ids = dec_pos_ids + prefix_max_position_ids + position_ids_3d_real = paddle.concat([position_ids, dec_pos_ids], + axis=1) + + rope_emb = get_rope_3d( + position_ids=position_ids_3d_real, + rotary_dim=self.model_cfg.head_dim, + paritial_rotary_factor=1.0, + base=self.model_cfg.rope_theta, + max_position=self.args.max_model_len, + freq_allocation=self.model_cfg.freq_allocation, + ) + return rope_emb + + def prefill_finished(self): + """ + Verify prefill operation completion + """ + prefill_statue = (self.share_inputs["seq_lens_this_time"] != 0) & ( + self.share_inputs["seq_lens_this_time"] != 1) + return not paddle.any(prefill_statue).numpy() + + def dy_input_preprocess(self, tasks: list[any]) -> None: + """ + dynamic insertion + """ + + def get_numeric_value(task, key, default_value): + if task.get(key, None) is not None: + return task.get(key) + else: + return default_value + + for i in range(len(tasks)): + task = tasks[i] + idx = task.idx + + kwargs = { + "max_length": + get_numeric_value(task, "max_tokens", 2048), + "top_p": + get_numeric_value(task, "top_p", 0.8), + "temperature": + get_numeric_value(task, "temperature", 0.2), + "top_k": + get_numeric_value(task, "top_k", 0), + "penalty_score": + get_numeric_value(task, "repetition_penalty", 1.0), + "frequency_score": + get_numeric_value(task, "frequency_penalty", 0.0), + "presence_score": + get_numeric_value(task, "presence_penalty", 0.0), + "decode_strategy": + "sampling", + "pad_token_id": + self.args.pad_token_id, + "enable_thinking": + get_numeric_value(task, "enable_thinking", True), + "reasoning_max_tokens": + get_numeric_value(task, "reasoning_max_tokens", 2048), + } + + if self.args.enable_chunked_prefill: + task.set("chunk_idx", 1) + inputs = self._preprocess_task(task.prefill_chunk_info[0]) + if inputs.get("images") is not None: + self.share_inputs[ + "image_features"] = self.extract_vision_features( + inputs) + else: + # Compatible with the situation that lacks images and videos + self.share_inputs["image_features"] = None + if task.multimodal_inputs["position_ids"] is not None: + position_ids = paddle.to_tensor( + task.multimodal_inputs["position_ids"], + dtype="int64").unsqueeze([0]) + else: + position_ids = None + + token_chunk_size = inputs["input_ids"].shape[1] + task.set("start_idx", token_chunk_size) + self.share_inputs["input_ids"][ + idx:idx + 1, :token_chunk_size] = inputs["input_ids"] + self.share_inputs["seq_lens_this_time"][idx:idx + + 1] = token_chunk_size + self.share_inputs["seq_lens_encoder"][idx:idx + + 1] = token_chunk_size + self.share_inputs["step_seq_lens_encoder"][ + idx:idx + 1] = token_chunk_size + else: + inputs = self._preprocess_task(task.multimodal_inputs) + if inputs.get("images") is not None: + self.share_inputs[ + "image_features"] = self.extract_vision_features( + inputs) + else: + # Compatible with the situation that lacks images and videos + self.share_inputs["image_features"] = None + position_ids = inputs["position_ids"] + + length = inputs["input_ids"].shape[1] + self.share_inputs["input_ids"][ + idx:idx + 1, :length] = inputs["input_ids"] + self.share_inputs["seq_lens_this_time"][idx:idx + 1] = length + self.share_inputs["seq_lens_encoder"][idx:idx + 1] = length + self.share_inputs["step_seq_lens_encoder"][idx:idx + + 1] = length + + # force + self.share_inputs["enable_thinking"][:] = kwargs["enable_thinking"] + self.share_inputs["need_think_end"][ + idx:idx + 1, :] = 1 if kwargs["enable_thinking"] else 0 + + self.share_inputs["reasoning_index"][ + idx:idx + 1, :] = kwargs["reasoning_max_tokens"] + + self.share_inputs["rope_emb"][idx:idx + + 1, :] = self.prepare_rope3d( + position_ids, **kwargs) + + self.share_inputs["top_p"][idx:idx + 1] = kwargs["top_p"] + self.share_inputs["temperature"][idx:idx + + 1] = kwargs["temperature"] + self.share_inputs["eos_token_id"][:] = np.array( + task.eos_token_ids).astype("int64").reshape(-1, 1) + self.share_inputs["penalty_score"][idx:idx + + 1] = kwargs["penalty_score"] + self.share_inputs["frequency_score"][idx:idx + + 1] = kwargs["frequency_score"] + self.share_inputs["presence_score"][idx:idx + + 1] = kwargs["presence_score"] + self.share_inputs["seq_lens_decoder"][idx:idx + 1] = 0 + self.share_inputs["step_idx"][idx:idx + 1] = 0 + self.share_inputs["min_dec_len"][idx:idx + 1] = 1 + self.share_inputs["max_dec_len"][idx:idx + + 1] = kwargs["max_length"] + self.share_inputs["stop_flags"][idx:idx + 1] = False + self.share_inputs["pre_ids"][idx:idx + 1] = -1 + encoder_block_num = len(task.get("block_tables")) + self.share_inputs["encoder_block_lens"][idx:idx + + 1] = encoder_block_num + self.share_inputs["block_tables"][idx:idx + 1, :] = -1 + self.share_inputs["block_tables"][ + idx:idx + 1, :encoder_block_num] = np.array(task.block_tables, + dtype="int32") + + def pre_process(self) -> None: + """ + pre_process + """ + if current_platform.is_cuda(): + if self.args.speculative_method is not None: + ( + ids_remove_padding, + padding_offset, + cum_offsets, + cu_seqlens_q, + cu_seqlens_k, + ) = speculate_remove_padding( + max_len=self.args.max_model_len, + input_ids=self.share_inputs["input_ids"], + seq_lens_this_time=self.share_inputs["seq_lens_this_time"], + draft_tokens=self.share_inputs["draft_tokens"], + seq_lens_encoder=self.share_inputs["seq_lens_encoder"]) + else: + ( + ids_remove_padding, + padding_offset, + cum_offsets, + cu_seqlens_q, + cu_seqlens_k, + ) = remove_padding( + max_len=self.args.max_model_len, + input_ids=self.share_inputs["input_ids"], + seq_lens_this_time=self.share_inputs["seq_lens_this_time"]) + self.share_inputs["ids_remove_padding"] = ids_remove_padding + self.share_inputs["padding_offset"] = padding_offset + self.share_inputs["cum_offsets"] = cum_offsets + self.share_inputs["cu_seqlens_q"] = cu_seqlens_q + self.share_inputs["cu_seqlens_k"] = cu_seqlens_k + self.share_inputs["decoder_batch_ids"] = paddle.full( + [self.fd_config.parallel_config.max_num_seqs, 1], 0, dtype='int32') + self.share_inputs["decoder_tile_ids_per_batch"] = paddle.full( + [self.fd_config.parallel_config.max_num_seqs, 1], 0, dtype='int32') + # initialize_forward_meta + self.forward_meta = ForwardMeta( + input_ids=self.share_inputs["input_ids"], + ids_remove_padding=self.share_inputs["ids_remove_padding"], + rotary_embs=self.share_inputs["rope_emb"], + attn_backend=self.attn_backend, + decoder_batch_ids=self.share_inputs["decoder_batch_ids"], + decoder_tile_ids_per_batch=self.share_inputs["decoder_tile_ids_per_batch"], + seq_lens_encoder=self.share_inputs["seq_lens_encoder"], + seq_lens_decoder=self.share_inputs["seq_lens_decoder"], + seq_lens_this_time=self.share_inputs["seq_lens_this_time"], + cum_offsets=self.share_inputs["cum_offsets"], + padding_offset=self.share_inputs["padding_offset"], + cu_seqlens_q=self.share_inputs["cu_seqlens_q"], + cu_seqlens_k=self.share_inputs["cu_seqlens_k"], + block_tables=self.share_inputs["block_tables"], + caches=self.share_inputs["caches"] + ) + self.attn_backend.init_attention_metadata(self.forward_meta) + + self.sampling_metadata = SamplingMetadata( + temperature=self.share_inputs["temperature"], + top_p=self.share_inputs["top_p"], + step_idx=self.share_inputs["step_idx"], + pre_token_ids=self.share_inputs["pre_ids"], + frequency_penalties=self.share_inputs["frequency_score"], + presence_penalties=self.share_inputs["presence_score"], + repetition_penalties=self.share_inputs["penalty_score"], + min_dec_lens=self.share_inputs["min_dec_len"], + bad_words_token_ids=self.share_inputs["bad_tokens"], + eos_token_ids=self.share_inputs["eos_token_id"], + ) + + def generate(self) -> None: + """ + generate + """ + self.pre_process() + hiddden_states = self.model(self.share_inputs["ids_remove_padding"], + self.share_inputs["image_features"], + self.forward_meta) + logits = self.model.compute_logits(hiddden_states) + set_value_by_flags_and_idx( + self.share_inputs["pre_ids"], + self.share_inputs["input_ids"], + self.share_inputs["seq_lens_this_time"], + self.share_inputs["seq_lens_encoder"], + self.share_inputs["seq_lens_decoder"], + self.share_inputs["step_idx"], + self.share_inputs["stop_flags"], + ) + # sampler & save_output + next_tokens = self.sampler(logits, self.sampling_metadata) + if self.fd_config.parallel_config.tensor_parallel_degree > 1: + paddle.distributed.broadcast(next_tokens, 0) + self.post_process(next_tokens) + + def post_process(self, next_tokens: paddle.Tensor) -> None: + """ + post_process + """ + if self.share_inputs["enable_thinking"]: + exists_think_end = next_tokens == self.model_cfg.think_end_id + paddle.assign( + paddle.where( + exists_think_end, + self.share_inputs["need_think_end"] - 1, + self.share_inputs["need_think_end"], + ), self.share_inputs["need_think_end"]) + + paddle.assign( + paddle.where( + self.share_inputs["need_think_end"].cast("bool"), + self.share_inputs["reasoning_index"] - 1, + self.share_inputs["reasoning_index"], + ), self.share_inputs["reasoning_index"]) + + stop_wo_think = ( + (next_tokens == self.share_inputs["eos_token_id"]) | + (self.share_inputs["reasoning_index"] == 0)) & ( + self.share_inputs["need_think_end"] > 0) + next_tokens = paddle.where(stop_wo_think, + self.model_cfg.think_end_id, + next_tokens) + paddle.assign( + paddle.where( + stop_wo_think, + self.share_inputs["need_think_end"] - 1, + self.share_inputs["need_think_end"], + ), self.share_inputs["need_think_end"]) + paddle.assign( + paddle.where( + self.share_inputs["stop_flags"], + self.share_inputs["step_idx"], + self.share_inputs["step_idx"] + 1, + ), + self.share_inputs["step_idx"], + ) + length_cond = paddle.greater_equal(self.share_inputs["step_idx"], + self.share_inputs["max_dec_len"]) + paddle.assign( + paddle.logical_or(self.share_inputs["stop_flags"], length_cond), + self.share_inputs["stop_flags"], + ) + + set_stop_value_multi_ends( + next_tokens, + self.share_inputs["stop_flags"], + self.share_inputs["seq_lens_this_time"], + self.share_inputs["eos_token_id"], + self.share_inputs["next_tokens"], + False, + ) # multi ends + # update inputs + with paddle.framework._no_check_dy2st_diff(): + update_inputs( + self.share_inputs["stop_flags"], + self.share_inputs["not_need_stop"], + self.share_inputs["seq_lens_this_time"], + self.share_inputs["seq_lens_encoder"], + self.share_inputs["seq_lens_decoder"], + self.share_inputs["input_ids"], + self.share_inputs["stop_nums"], + next_tokens, + self.share_inputs["is_block_step"], + ) + save_output( + next_tokens, + self.share_inputs["not_need_stop"], + self.rank, + False, # use_ep + ) + + def _cal_theortical_kvcache(self): + """ + Calculate the size of kvcache for computational theory + """ + num_layers = self.model_cfg.get("num_layers", + None) or self.model_cfg.get( + "num_hidden_layers", None) + byte_of_cache = 2 + # support c8 c4 + + hidden_dim = self.model_cfg.head_dim * self.model_cfg.kv_num_head + theoretical_kv_cache_memory = (2 * byte_of_cache * + self.args.block_size * num_layers * + hidden_dim) + return theoretical_kv_cache_memory + + def _update_share_input_block_num(self): + """ + Update share_inputs['block_tables'] and share_inputs['free_list'] + """ + num_gpu_blocks = self.num_gpu_blocks + + del self.share_inputs["caches"] + self._init_kvcache() + + del self.share_inputs["block_tables"] + self.share_inputs["block_tables"] = paddle.full( + [self.args.max_num_seqs, num_gpu_blocks], -1, dtype="int32") + + # Init free list + free_list = list( + range(num_gpu_blocks - 1, + int(num_gpu_blocks * self.args.kv_cache_ratio) - 1, -1)) + self.free_list_len = len(free_list) + self.share_inputs.update({ + "free_list": + paddle.to_tensor(free_list, dtype="int32"), + "free_list_len": + paddle.full([1], self.free_list_len, dtype="int32"), + }) + + def dummy_input(self, num_total_tokens: int, number_of_tasks: int) -> None: + """ + fake input to profile + """ + input_length = min(num_total_tokens // number_of_tasks, + self.args.max_model_len - 10) + block_num = (input_length + self.args.block_size - 1 ) // self.args.block_size \ + + self.args.enc_dec_block_num + self.share_inputs["free_list"] = paddle.to_tensor([], dtype="int32") + self.share_inputs["free_list_len"][0] = 0 + + for i in range(number_of_tasks): + idx = i + self.share_inputs["input_ids"][idx:idx + + 1, :input_length] = np.array( + [5] * input_length) + self.share_inputs["eos_token_id"][:] = np.array( + [2], dtype="int64").reshape(-1, 1) + self.share_inputs["seq_lens_this_time"][idx:idx + 1] = input_length + self.share_inputs["step_seq_lens_encoder"][idx:idx + + 1] = input_length + self.share_inputs["seq_lens_encoder"][idx:idx + 1] = input_length + self.share_inputs["seq_lens_decoder"][idx:idx + 1] = 0 + self.share_inputs["step_idx"][idx:idx + 1] = 0 + self.share_inputs["max_dec_len"][idx:idx + 1] = 10 + self.share_inputs["stop_flags"][idx:idx + 1] = False + + self.share_inputs["first_token_ids"][ + idx:idx + 1] = self.share_inputs["input_ids"][idx:idx + 1, :1] + self.share_inputs["ori_seq_lens_encoder"][idx:idx + + 1] = input_length + + self.share_inputs["infer_seed"][idx:idx + 1] = random.randint( + 0, 922337203685477580) + self.share_inputs["encoder_block_lens"][idx:idx + 1] = block_num + self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(idx * block_num, \ + (idx + 1) * block_num, 1) + + def _preprocess_task(self, one: dict) -> None: + """process batch""" + + input_ids = one["input_ids"][np.newaxis, :] + input_ids = paddle.to_tensor(input_ids, dtype=paddle.int64) + token_type_ids = one["token_type_ids"][np.newaxis, :] + token_type_ids = paddle.to_tensor(token_type_ids, dtype=paddle.int64) + + if one["images"] is not None: + image_type_ids = one["image_type_ids"][np.newaxis, :] + images = one["images"] + image_type_ids = paddle.to_tensor(image_type_ids, + dtype=paddle.int64) + images = paddle.to_tensor(images, dtype="uint8") + grid_thw = paddle.to_tensor(one["grid_thw"], dtype="int64") + else: + image_type_ids = None + images = None + grid_thw = None + + if one["position_ids"] is not None: + position_ids = paddle.to_tensor(one["position_ids"], + dtype="int64").unsqueeze([0]) + else: + position_ids = None + + result = dict( + input_ids=input_ids, + image_type_ids=image_type_ids, + token_type_ids=token_type_ids, + position_ids=position_ids, + grid_thw=grid_thw, + images=images, + ) + return result + + +def build_stream_line_model( + model_path: str, + dtype: str, + block_size: int, + max_model_len: int, + tokenizer: ErnieBotTokenizer, + quantization: str = "None", + graph_opt_config: Optional[GraphOptimizationConfig] = None +) -> tuple[FDConfig, paddle.nn.layer]: + """ + build model + """ + import contextlib + + from paddleformers.transformers.configuration_utils import PretrainedConfig + from paddleformers.trl import llm_utils + from paddleformers.utils.log import logger + + from fastdeploy.model_executor.layers.quantization import \ + get_quantization_config + from fastdeploy.model_executor.models.model_base import ModelRegistry + + config, _ = PretrainedConfig.get_config_dict(model_path) + config["head_dim"] = config.get( + "head_dim", config["hidden_size"] // config["num_attention_heads"]) + config["rope_theta"] = config.get("rope_theta", 10000.0) + rope_theta = config["rope_theta"] + model_config = ModelConfig.from_dict(config) + model_config.head_dim = config["head_dim"] + + parallel_config = ParallelConfig() + speculative_config = SpeculativeConfig() + device_config = DeviceConfig() + load_config = LoadConfig() + moe_config = MoEConfig() + kv_cache_config = KVCacheConfig() + kv_cache_config.cache_quant_dtype = "none" + + tensor_parallel_rank, tensor_parallel_degree = llm_utils.init_dist_env() + parallel_config.tensor_parallel_rank = tensor_parallel_rank + parallel_config.tensor_parallel_degree = tensor_parallel_degree + parallel_config.tensor_parallel_degree = tensor_parallel_degree + parallel_config.expert_parallel_degree = 1 + parallel_config.expert_parallel_rank = int(tensor_parallel_rank / + tensor_parallel_degree) + parallel_config.column_cut = False + + speculative_config.is_mtp = False + speculative_config.draft_type = "None" + + # Note(tangbinhan): used for load_checkpoint + model_config.tensor_parallel_rank = parallel_config.tensor_parallel_rank + model_config.tensor_parallel_degree = parallel_config.tensor_parallel_degree + model_config.is_mtp = speculative_config.is_mtp + moe_config.num_experts = None + + # use the length of tokenizer as the origin vocab size + ori_vocab_size = len(tokenizer) + moe_intermediate_size = (config.get("moe_intermediate_size", None), ) + if isinstance(moe_intermediate_size, list) or isinstance( + moe_intermediate_size, tuple): + moe_intermediate_size = moe_intermediate_size[0] + + num_key_value_heads = config.get("num_key_value_heads", -1) + if num_key_value_heads is None: + num_key_value_heads = -1 + + # RL need, some model num_key_value_heads less tensor_parallel_degree, need copy + if num_key_value_heads < tensor_parallel_degree: + logger.warning( + f"key value heads num is {num_key_value_heads}, tensor parallel degree is {tensor_parallel_degree}" + ) + num_key_value_heads = tensor_parallel_degree + + if config.get("ffn_hidden_size", None) is not None: + ffn_hidden_size = config["ffn_hidden_size"] + elif config.get("intermediate_size", None) is not None: + ffn_hidden_size = config["intermediate_size"] + else: + ffn_hidden_size = 4 * config["hidden_size"] + if config["hidden_act"].lower() == "swiglu": + if paddle.distributed.get_world_size() > 1: + multiple_of = 8 * config["num_attention_heads"] + else: + multiple_of = 4 * config["num_attention_heads"] + ffn_hidden_size = multiple_of * ( + (int(2 * ffn_hidden_size / 3) + multiple_of - 1) // + multiple_of) + + num_layers = config.get("num_layers", None) or config.get( + "num_hidden_layers", None) + if num_layers is None: + raise ValueError(f"num_layers<{num_layers}> is invalid") + + remove_tail_layer = config.get("remove_tail_layer") + if remove_tail_layer is True: + num_layers -= 1 + elif isinstance(remove_tail_layer, int): + num_layers -= remove_tail_layer + + moe_num_experts = config.get("moe_num_experts", 0) + if isinstance(moe_num_experts, list): + moe_num_experts = max(moe_num_experts) + use_moe = moe_num_experts > 0 + + context = contextlib.nullcontext() + + if config["hidden_act"].lower() == "swiglu": + model_config.hidden_act = "swiglu" + model_config.ffn_hidden_size = ffn_hidden_size + model_config.max_seq_len = max_model_len + model_config.num_layers = num_layers + model_config.dtype = dtype + parallel_config.block_size = block_size + + parallel_config.msg_queue_id = None + model_config.num_key_value_heads = num_key_value_heads + model_config.return_all_hidden_states = False + speculative_config.draft_type = "None" + model_config.start_layer_index = 0 + if use_moe: + moe_config.num_experts = config.get("moe_num_experts", None) + moe_config.moe_intermediate_size = config.get("moe_intermediate_size", + None) + moe_config.top_k = config.get("moe_topk", 8) + moe_config.moe_num_shared_experts = config.get( + "moe_num_shared_experts", 0) + moe_config.moe_layer_start_index = config.get("moe_layer_start_index", + None) + moe_config.moe_layer_end_index = config.get("moe_layer_end_index", + None) + + model_config.moe_phase = MoEPhase(phase="prefill") + model_config.ori_vocab_size = ori_vocab_size + + quantization_config = config.get("quantization_config", None) + + quant_config_name = None + if quantization_config is not None and quantization_config.get( + "quantization", None) is None: + raise ValueError( + "quantization_config should have a key named 'quantization' for specify quant config." + ) + + if quantization_config is not None: + quant_config_name = quantization_config["quantization"] + quant_cls = get_quantization_config(quant_config_name) + quant_config = quant_cls.from_config(quantization_config) + elif quantization != "None": + quantization_config = {} + if use_moe and quantization == "wint4": + quantization_config["dense_quant_type"] = "wint8" + quantization_config["moe_quant_type"] = "wint4" + quant_config_name = "mix_quant" + else: + quant_config_name = quantization + quant_cls = get_quantization_config(quant_config_name) + quant_config = quant_cls.from_config(quantization_config) + else: + quant_config = None + + logger.info("===========quantization_config==============") + if quant_config is not None: + logger.info(f"{quantization_config}") + else: + logger.info( + "No quantization config found and use original weight and act dtype." + ) + logger.info("============================================") + + fd_config = FDConfig( + model_config=model_config, + parallel_config=parallel_config, + speculative_config=speculative_config, + device_config=device_config, + load_config=load_config, + moe_config=moe_config, + quant_config=quant_config, + kv_cache_config=kv_cache_config, + graph_opt_config=graph_opt_config, + ) + fd_config.parallel_config.max_model_len = max_model_len + fd_config.model_config.rope_theta = rope_theta + + with context: + model_cls = ModelRegistry.get_class(model_config.architectures[0]) + model = model_cls(fd_config) + + model.eval() + return fd_config, model diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 32373b3083..2fb52926f6 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -648,6 +648,15 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: if getattr(model_config, "num_hidden_layers", None) is None: raise ValueError("num_hidden_layers is None") + # Set MoE phase based on splitwise role + if parallel_config.splitwise_role == "mixed": + parallel_config.moe_phase.phase = "prefill" + elif parallel_config.splitwise_role == "prefill": + parallel_config.moe_phase = "prefill" + elif parallel_config.splitwise_role == "decode": + parallel_config.moe_phase = "decode" + elif parallel_config.splitwise_role is not None: + raise NotImplementedError quantization_config = model_config.quantization_config if not model_config.is_quantized: From 50b78d33973e114c958632ae081c98fb116f70fb Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Tue, 22 Jul 2025 20:45:01 +0800 Subject: [PATCH 02/10] fix comment --- fastdeploy/model_executor/layers/moe/ep.py | 1 - fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py | 2 -- 2 files changed, 3 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index a387f5680e..7c453a6fa6 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -407,7 +407,6 @@ def combine(self, ffn_out, topk_idx, topk_weights, handle): None, num_experts, ) - print(f'ffn_out shape: {ffn_out.shape}, num_ranks: {8}, num_max_dispatch_tokens_per_rank: {num_max_dispatch_tokens_per_rank}') combined_hidden_states, combine_hook = self.ep_engine.low_latency_combine( ffn_out, topk_idx, topk_weights, handle ) diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index 0de6d1f13c..cc811d68f4 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -154,10 +154,8 @@ def apply( """ if layer.ep_size > 1: if layer.fd_config.parallel_config.moe_phase.phase == "prefill": - print("Apply ep prefill") return self.apply_ep_prefill(layer, x, gate_out) elif layer.fd_config.parallel_config.moe_phase.phase == "decode": - print("Apply ep decode") return self.apply_ep_decode(layer, x, gate_out) else: logger.error( From 62db77a9655186c29bdafd817e70193341c60bd9 Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Tue, 22 Jul 2025 20:49:48 +0800 Subject: [PATCH 03/10] fix comment --- fastdeploy/worker/gpu_model_runner.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 776f96faeb..a25f3db7c1 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1168,13 +1168,11 @@ class at the server level, which is too granular for ModelRunner. > 1).sum() > 0)) paddle.distributed.broadcast(is_decode_batch, src=0) self.fd_config.parallel_config.moe_phase.phase = "decode" if is_decode_batch else "prefill" - print(f'rank: {self.rank}, is decode batch: {is_decode_batch}, seq_lens_encoder: {self.share_inputs["seq_lens_encoder"]}') # NOTE(wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state. # This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode, # when there is data on other runner, the current runner is required to execute part of the model. if not self.not_need_stop(): - print("got into empty_input") self._execute_empty_input() return None From e6a620dea1750c04fc14edba72e3ee48f9059e87 Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Thu, 24 Jul 2025 13:45:01 +0800 Subject: [PATCH 04/10] update mixep --- fastdeploy/model_executor/layers/moe/ep.py | 55 +++++++++++++++---- .../layers/moe/fused_moe_backend_base.py | 4 ++ fastdeploy/worker/gpu_model_runner.py | 14 +++-- 3 files changed, 58 insertions(+), 15 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index 7c453a6fa6..82c1ed76e7 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -45,6 +45,8 @@ def __init__( num_experts: int, ep_size: int, ep_rank: int, + splitwise_role: str, + moe_phase: MoEPhase, async_finish: bool = False, ): """ @@ -69,21 +71,40 @@ def __init__( self.prefill_deepep_engine = None self.decode_deepep_engine = None - if moe_phase == MoEPhase.DECODER: + self.ep_config = Config(24, 6, 256) + self.num_max_dispatch_tokens_per_rank = num_max_dispatch_tokens_per_rank + + # In mixed EP mode on a single node, we dynamically switch between + # high throughput and low latency modes. + if splitwise_role == "mixed": + # decode engine logger.info("Initializing Low Latency Buffer") - self.num_max_dispatch_tokens_per_rank = num_max_dispatch_tokens_per_rank self.get_low_latency_buffer() - elif moe_phase == MoEPhase.PREFILL: - self.deepep_engine = deep_ep.Buffer( + # prefill engine + self.prefill_deepep_engine = deep_ep.Buffer( self.group, - int(5e8), + int(1e9), 0, low_latency_mode=False, num_qps_per_rank=1, ) - self.ep_config = Config(24, 6, 256) + # In disaggregated mode on mutiple nodes, we either use + # high throughput mode or low latency mode. else: - raise ValueError(f"Unknown generation phase {moe_phase}") + if moe_phase.phase == "decode": + logger.info("Initializing Low Latency Buffer") + self.get_low_latency_buffer() + elif moe_phase.phase == "prefill": + self.prefill_deepep_engine = deep_ep.Buffer( + self.group, + int(1e9), + 0, + low_latency_mode=False, + num_qps_per_rank=1, + ) + else: + raise ValueError(f"Unknown generation phase {moe_phase}") + def get_low_latency_buffer(self): """ @@ -196,8 +217,11 @@ def barrier_all(self): """ barrier_all """ - self.prefill_deepep_engine.barrier_all() - self.decode_deepep_engine.barrier_all() + if self.prefill_deepep_engine is not None: + self.prefill_deepep_engine.barrier_all() + + if self.decode_deepep_engine is not None: + self.decode_deepep_engine.barrier_all() class EPRunner: @@ -210,6 +234,8 @@ def __init__( top_k: int, hidden: int, num_experts: int, + splitwise_role: str, + moe_phase: MoEPhase, num_max_dispatch_tokens_per_rank: int = 1, ep_size: int = 1, ep_rank: int = 0, @@ -225,6 +251,8 @@ def __init__( moe_phase=moe_phase, ep_size=ep_size, ep_rank=ep_rank, + splitwise_role=splitwise_role, + moe_phase=moe_phase, ) def moe_select(self, layer: nn.Layer, gate_out: paddle.Tensor): @@ -285,14 +313,18 @@ def __init__( top_k: int, hidden: int, num_experts: int, + splitwise_role: str, ep_size: int = 1, ep_rank: int = 0, redundant_experts_num: int = 0, + moe_phase: MoEPhase = MoEPhase("prefill"), ): super().__init__( top_k, hidden, num_experts, + splitwise_role, + moe_phase, num_max_dispatch_tokens_per_rank=256, ep_size=ep_size, ep_rank=ep_rank, @@ -356,16 +388,19 @@ def __init__( top_k: int, hidden: int, num_experts: int, + splitwise_role: str, num_max_dispatch_tokens_per_rank: int, ep_size: int = 1, ep_rank: int = 0, redundant_experts_num: int = 0, + moe_phase: MoEPhase = MoEPhase("decode"), ): super().__init__( top_k, hidden, num_experts, - MoEPhase.DECODER, + splitwise_role, + moe_phase, num_max_dispatch_tokens_per_rank, ep_size=ep_size, ep_rank=ep_rank, diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index cc811d68f4..662e825b27 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -49,11 +49,13 @@ def init_ep(self, layer: nn.Layer) -> None: from .ep import EPPrefillRunner, EPDecoderRunner self.ep_prefill_runner = EPPrefillRunner( layer.top_k, layer.hidden_size, layer.num_experts, + layer.fd_config.parallel_config.splitwise_role, layer.ep_size, layer.ep_rank) self.ep_decoder_runner = EPDecoderRunner( layer.top_k, layer.hidden_size, layer.num_experts, + layer.fd_config.parallel_config.splitwise_role, layer.fd_config.model_config.num_max_dispatch_tokens_per_rank, layer.ep_size, layer.ep_rank, @@ -64,12 +66,14 @@ def init_ep(self, layer: nn.Layer) -> None: from .ep import EPPrefillRunner self.ep_prefill_runner = EPPrefillRunner( layer.top_k, layer.hidden_size, layer.num_experts, + layer.fd_config.parallel_config.splitwise_role, layer.ep_size, layer.ep_rank) else: from .ep import EPDecoderRunner self.ep_decoder_runner = EPDecoderRunner( layer.top_k, layer.hidden_size, layer.num_experts, layer.moe_config.num_max_dispatch_tokens_per_rank, + layer.fd_config.parallel_config.splitwise_role, layer.ep_size, layer.ep_rank) self.ep_prefill_runner = EPPrefillRunner( diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index a25f3db7c1..660c8e2214 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1163,11 +1163,15 @@ class at the server level, which is too granular for ModelRunner. We plan to replace it with 'ModelForwardBatch'. intermediate_tensors: """ - # NOTE(wufeisheng): For Expert Parallelism - is_decode_batch = paddle.to_tensor(not ((self.share_inputs["seq_lens_this_time"] - > 1).sum() > 0)) - paddle.distributed.broadcast(is_decode_batch, src=0) - self.fd_config.parallel_config.moe_phase.phase = "decode" if is_decode_batch else "prefill" + is_decode_batch = not ((self.share_inputs["seq_lens_this_time"] + > 1).sum() > 0) + + # mix ep in single node + if self.fd_config.parallel_config.use_ep and self.fd_config.parallel_config.splitwise_role == "mixed": + is_decode_batch_list = [] + paddle.distributed.all_gather_object(is_decode_batch_list, is_decode_batch) + is_decode_batch = all(is_decode_batch_list) + self.fd_config.parallel_config.moe_phase.phase = "decode" if is_decode_batch else "prefill" # NOTE(wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state. # This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode, From f206ac2454eaa768d873c25ab418cb0b12ec35dc Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Thu, 24 Jul 2025 15:09:12 +0800 Subject: [PATCH 05/10] fix conflict --- fastdeploy/config.py | 2 +- .../layers/moe/fused_moe_backend_base.py | 9 ------ fastdeploy/worker/gpu_model_runner.py | 30 +++++++++---------- 3 files changed, 15 insertions(+), 26 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 6b763dcde2..f610239665 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -156,7 +156,7 @@ def __init__( ): self.sequence_parallel = False # Whether to enable sequence parallelism. self.use_ep = False # Whether to enable Expert Parallelism - self.moe_phase = MoEPhase.PREFILL # Generation phase + self.moe_phase = MoEPhase("prefill") # Generation phase self.msg_queue_id = 1 # mesage queue id self.tensor_parallel_rank = 0 # TP rank ID diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index 662e825b27..8a2a83ad17 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -76,15 +76,6 @@ def init_ep(self, layer: nn.Layer) -> None: layer.fd_config.parallel_config.splitwise_role, layer.ep_size, layer.ep_rank) - self.ep_prefill_runner = EPPrefillRunner( - layer.top_k, - layer.hidden_size, - layer.num_experts, - layer.ep_size, - layer.ep_rank, - layer.fd_config.model_config.redundant_experts_num, - ) - def process_loaded_weights(self, layer, weights) -> None: """ process_loaded_weights diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 660c8e2214..a77770575a 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -475,7 +475,7 @@ def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decod for i in range(batch_size): idx = i self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) - self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) + # self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length @@ -794,8 +794,16 @@ def initialize_forward_meta(self): # Update Batch type for cuda graph # TODO(gongshaotian): Use seq_lens_encoder to set is_decode_batch is_decode_batch = not ((self.share_inputs["seq_lens_this_time"] > 1).sum() > 0) + + # mix ep in single node + if self.fd_config.parallel_config.use_ep and self.fd_config.parallel_config.splitwise_role == "mixed": + is_decode_batch_list = [] + paddle.distributed.all_gather_object(is_decode_batch_list, is_decode_batch) + is_decode_batch = all(is_decode_batch_list) + self.fd_config.parallel_config.moe_phase.phase = "decode" if is_decode_batch else "prefill" + self.forward_meta.step_use_cudagraph = self.use_cudagraph and is_decode_batch - + # Initialzie attention meta data for attn_backend in self.attn_backends: attn_backend.init_attention_metadata(self.forward_meta) @@ -1163,15 +1171,10 @@ class at the server level, which is too granular for ModelRunner. We plan to replace it with 'ModelForwardBatch'. intermediate_tensors: """ - is_decode_batch = not ((self.share_inputs["seq_lens_this_time"] - > 1).sum() > 0) - - # mix ep in single node - if self.fd_config.parallel_config.use_ep and self.fd_config.parallel_config.splitwise_role == "mixed": - is_decode_batch_list = [] - paddle.distributed.all_gather_object(is_decode_batch_list, is_decode_batch) - is_decode_batch = all(is_decode_batch_list) - self.fd_config.parallel_config.moe_phase.phase = "decode" if is_decode_batch else "prefill" + # 1. Prepare inputs of model and sampler. + skip_idx_list = self._get_skip_idx(model_forward_batch) + self._prepare_inputs() + self.sampler.pre_process(skip_idx_list) # NOTE(wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state. # This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode, @@ -1180,11 +1183,6 @@ class at the server level, which is too granular for ModelRunner. self._execute_empty_input() return None - # 1. Prepare inputs of model and sampler. - skip_idx_list = self._get_skip_idx(model_forward_batch) - self._prepare_inputs() - self.sampler.pre_process(skip_idx_list) - # 2. Padding inputs for cuda graph self.padding_cudagraph_inputs() From 9ede5a23ddb24c622d577aa2587c61d6ef300678 Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Thu, 24 Jul 2025 15:15:57 +0800 Subject: [PATCH 06/10] fix typo --- fastdeploy/model_executor/layers/moe/ep.py | 15 - fastdeploy/worker/gpu_model_runner.py | 4 +- fastdeploy/worker/vl_gpu_model_runner.py | 1256 -------------------- 3 files changed, 2 insertions(+), 1273 deletions(-) delete mode 100644 fastdeploy/worker/vl_gpu_model_runner.py diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index 82c1ed76e7..3c196306e2 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -427,21 +427,6 @@ def dispatch( return recv_hidden_states, recv_expert_count, handle def combine(self, ffn_out, topk_idx, topk_weights, handle): - # TODO(@wufeisheng): Delete them when deepep in PaddlePaddle is fixed - ( - src_info, - layout_range, - num_max_dispatch_tokens_per_rank, - num_experts, - ) = handle - - handle = ( - src_info, - layout_range, - num_max_dispatch_tokens_per_rank, - None, - num_experts, - ) combined_hidden_states, combine_hook = self.ep_engine.low_latency_combine( ffn_out, topk_idx, topk_weights, handle ) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index a77770575a..996d1ff04f 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -23,7 +23,7 @@ from paddle import nn from paddleformers.utils.log import logger -from fastdeploy.config import FDConfig, MoEPhase +from fastdeploy.config import FDConfig from fastdeploy.engine.request import Request, RequestType from fastdeploy.model_executor.graph_optimization.utils import ( profile_run_guard, @@ -475,7 +475,7 @@ def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decod for i in range(batch_size): idx = i self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) - # self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) + self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length diff --git a/fastdeploy/worker/vl_gpu_model_runner.py b/fastdeploy/worker/vl_gpu_model_runner.py deleted file mode 100644 index 43592441a0..0000000000 --- a/fastdeploy/worker/vl_gpu_model_runner.py +++ /dev/null @@ -1,1256 +0,0 @@ -""" -# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License" -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -import argparse -import json -import os -import random -from typing import Optional - -import numpy as np -import paddle -import paddle.distributed.fleet as fleet -from paddleformers.transformers.model_utils import load_tp_checkpoint -from safetensors import safe_open - -from fastdeploy.config import (DeviceConfig, FDConfig, GraphOptimizationConfig, - KVCacheConfig, LoadConfig, ModelConfig, - MoEConfig, MoEPhase, ParallelConfig, - SpeculativeConfig) -from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer -from fastdeploy.input.mm_processor import DataProcessor -from fastdeploy.model_executor.layers.attention import get_attention_backend -from fastdeploy.model_executor.layers.rotary_embedding import get_rope_3d -from fastdeploy.model_executor.layers.sample.meta_data import SamplingMetadata -from fastdeploy.model_executor.layers.sample.sampler import Sampler -from fastdeploy.model_executor.models.ernie4_5_moe import \ - Ernie4_5_PretrainedModel -from fastdeploy.model_executor.models.ernie4_5_vl.configuration import \ - Ernie4_5_VLMoeConfig -from fastdeploy.model_executor.models.ernie4_5_vl.dfnrope import \ - DFNRopeVisionTransformerConfig -from fastdeploy.model_executor.models.ernie4_5_vl.dfnrope.modeling import \ - DFNRopeVisionTransformerPretrainedModel -from fastdeploy.model_executor.models.ernie4_5_vl.modeling_resampler import ( - ScatterOp, VariableResolutionResamplerModel) -from fastdeploy.platforms import current_platform -from fastdeploy.worker.forward_meta import ForwardMeta -from fastdeploy.worker.utils import check_safetensors_model -from fastdeploy.worker.vl_model_runner_base import VLModelRunnerBase - -if current_platform.is_cuda() and current_platform.available(): - from fastdeploy.model_executor.layers.utils import ( - remove_padding, speculate_remove_padding) - -from fastdeploy.model_executor.ops.gpu import (save_output, - set_stop_value_multi_ends, - set_value_by_flags_and_idx, - update_inputs) - - -class GPUVLModelRunner(VLModelRunnerBase): - """ - The GPUVLModelRunner class for vision-language tasks on GPU. - """ - - def __init__( - self, - config: ModelConfig, - args: argparse.Namespace, - nranks: int, - rank: int, - ) -> None: - """ - GPUVLModelRunner init - """ - self.nranks = nranks - self.rank = rank - - hcg = fleet.get_hybrid_communicate_group() - self.tensor_parallel_degree = max(hcg.get_model_parallel_world_size(), - 1) - self.tensor_parallel_rank = hcg.get_model_parallel_rank() - self.mp_src_rank = hcg.get_model_parallel_group_src_rank() - self.mp_group = hcg.get_model_parallel_group() - self.is_safetensors_model = check_safetensors_model( - args.model_name_or_path) - - model_path = os.path.dirname(args.model_name_or_path) - args.llm_model_name_or_path = args.model_name_or_path - if not self.is_safetensors_model: - args.tokenizer = args.image_preprocessor = model_path - else: - args.tokenizer = args.image_preprocessor = args.model_name_or_path - args.vision_model_name_or_path = os.path.join( - model_path, "DFNRopeVisionTransformer") - - self.amp_black = [ - "reduce_sum", - "c_softmax_with_cross_entropy", - "elementwise_div", - "sin", - "cos", - "sort", - "multinomial", - ] - self.amp_white = [ - "lookup_table", - "lookup_table_v2", - "flash_attn", - "matmul", - "matmul_v2", - "fused_gemm_epilogue", - ] - - super().__init__(config, args) - self.init_extra_input(config, args) - - self._reset_paddle_env() - - self.sampler = Sampler() - - def _reset_paddle_env(self): - pass - - def update_chunked_prefill(self, tasks: list[any]) -> None: - """ - update chunked prefill - """ - if not self.args.enable_chunked_prefill: - return - - for task in tasks: - if task.chunk_idx > len(task.prefill_chunk_info): - continue - - idx = task.idx - if task.chunk_idx == len(task.prefill_chunk_info): - self.share_inputs["seq_lens_this_time"][idx:idx + 1] = 1 - self.share_inputs['seq_lens_encoder'][idx:idx + 1] = 0 - self.share_inputs["seq_lens_decoder"][idx:idx + - 1] = task.start_idx - self.share_inputs["step_idx"][idx:idx + 1] = 1 - else: - inputs = self._preprocess_task( - task.prefill_chunk_info[task.chunk_idx]) - if inputs.get("images") is not None: - self.share_inputs[ - "image_features"] = self.extract_vision_features( - inputs) - else: - # Compatible with the situation that lacks images and videos - self.share_inputs["image_features"] = None - - token_chunk_size = inputs["input_ids"].shape[1] - self.share_inputs["input_ids"][ - idx:idx + 1, :token_chunk_size] = inputs["input_ids"] - self.share_inputs["seq_lens_this_time"][idx:idx + - 1] = token_chunk_size - self.share_inputs['seq_lens_encoder'][idx:idx + - 1] = token_chunk_size - self.share_inputs["seq_lens_decoder"][idx:idx + - 1] = task.start_idx - self.share_inputs["step_idx"][idx:idx + 1] = 0 - - task.start_idx += token_chunk_size - task.chunk_idx += 1 - - def _load_model( - self, - model_name: str, - dynamic_load_weight: int = 0, - ) -> None: - """ - Load the model from the given model name. - """ - - vocab_file_names = [ - "tokenizer.model", "spm.model", "ernie_token_100k.model" - ] - for i in range(len(vocab_file_names)): - if os.path.exists( - os.path.join(self.args.tokenizer, vocab_file_names[i])): - ErnieBotTokenizer.resource_files_names[ - "vocab_file"] = vocab_file_names[i] - break - - tokenizer = ErnieBotTokenizer.from_pretrained( - self.args.tokenizer, - model_max_length=self.args.max_model_len, - padding_side="right", - use_fast=False, - ) - tokenizer.ignored_index = -100 - if tokenizer.pad_token is None: - tokenizer.pad_token = tokenizer.unk_token - - config = Ernie4_5_VLMoeConfig.from_pretrained( - self.args.llm_model_name_or_path, - tensor_parallel_degree=self.tensor_parallel_degree, - tensor_parallel_rank=self.tensor_parallel_rank, - moe_group="dummy", - ) - self.model_cfg = config - if self.is_safetensors_model: - meta_json = os.path.join(self.args.model_name_or_path, - "model.safetensors.index.json") - if os.path.exists(meta_json): - with open( - os.path.join(self.args.model_name_or_path, - "model.safetensors.index.json"), - "r") as f: - self.weight_map = json.load(f)["weight_map"] - else: - self.weight_map = {} - with safe_open(os.path.join(self.args.model_name_or_path, - "model.safetensors"), - framework="np") as f: - keys = f.keys() - for k in keys: - self.weight_map[k] = "model.safetensors" - - if self.is_safetensors_model: - vision_config = config.vision_config - vision_config.tensor_parallel_degree = self.tensor_parallel_degree - vision_config.tensor_parallel_rank = self.tensor_parallel_rank - vision_config.attn_sep = False - vision_config.dtype = "bfloat16" - else: - vision_config = DFNRopeVisionTransformerConfig.from_pretrained( - self.args.vision_model_name_or_path, - tensor_parallel_degree=self.tensor_parallel_degree, - tensor_parallel_rank=self.tensor_parallel_rank, - attn_sep=False, - dtype="bfloat16", - ) - config.vision_config = vision_config - self.vision_config = vision_config - config.pixel_hidden_size = config.vision_config.hidden_size - config.im_patch_id = tokenizer.get_vocab()["<|IMAGE_PLACEHOLDER|>"] - config.think_end_id = tokenizer.get_vocab()[""] - config.max_text_id = config.im_patch_id - - config.sequence_parallel = False - - self.dtype = self.args.dtype - paddle.set_default_dtype(self.dtype) - - self.vision_model, self.resampler_model = self.inject_pp_vision_model( - self.args, config) - - processor = DataProcessor( - tokenizer_name=self.args.tokenizer, - image_preprocessor_name=str(self.args.image_preprocessor), - ) - processor.eval() - image_preprocess = processor.image_preprocessor - image_preprocess.image_mean_tensor = paddle.to_tensor( - image_preprocess.image_mean, dtype="float32").reshape([1, 3, 1, 1]) - image_preprocess.image_std_tensor = paddle.to_tensor( - image_preprocess.image_std, dtype="float32").reshape([1, 3, 1, 1]) - image_preprocess.rescale_factor = paddle.to_tensor( - image_preprocess.rescale_factor, dtype="float32") - image_preprocess.image_mean_tensor = image_preprocess.image_mean_tensor.squeeze( - [-2, -1]).repeat_interleave(config.vision_config.patch_size**2 * 1, - -1) - image_preprocess.image_std_tensor = image_preprocess.image_std_tensor.squeeze( - [-2, -1]).repeat_interleave(config.vision_config.patch_size**2 * 1, - -1) - self.image_preprocess = image_preprocess - - graph_opt_config = GraphOptimizationConfig( - self.args.enable_static_graph_inference, self.args.use_cudagraph, - self.args.max_capture_batch_size) - - fd_config, self.model = build_stream_line_model( - self.args.model_name_or_path, - self.args.dtype, - self.args.block_size, - max_model_len=self.args.max_model_len, - tokenizer=tokenizer, - quantization=self.args.quantization, - graph_opt_config=graph_opt_config, - ) - self.model.eval() - self.set_state_dict(self.args) - - fd_config.parallel_config.max_model_len = fd_config.model_config.max_seq_len - self.fd_config = fd_config - attn_backend_cls = get_attention_backend() - num_heads = self.fd_config.model_config.num_attention_heads // \ - self.fd_config.parallel_config.tensor_parallel_degree - self.fd_config.model_config.kv_num_heads = int( - self.fd_config.model_config.num_key_value_heads - ) // self.fd_config.parallel_config.tensor_parallel_degree - head_dim = self.fd_config.model_config.head_dim - self.attn_backend = attn_backend_cls( - self.fd_config, - kv_num_heads=self.fd_config.model_config.kv_num_heads, - num_heads=num_heads, - head_dim=head_dim) - self._init_kvcache() - - def init_extra_input(self, config: ModelConfig, args: argparse.Namespace) -> None: - """ - Initialize extra input tensors. - """ - head_dim = self.model_cfg.head_dim - self.share_inputs.update({ - "rope_emb": - paddle.full(shape=[ - args.max_num_seqs, 2, 1, self.max_length, 1, head_dim // 2 - ], - fill_value=0, - dtype="float32") - }) - self.share_inputs.update({"image_features": None}) - self.share_inputs.update({ - "need_think_end": - paddle.full(shape=[args.max_num_seqs, 1], - fill_value=0, - dtype="int32") - }) - self.share_inputs.update({ - "enable_thinking": - paddle.full(shape=[1], fill_value=True, dtype="bool") - }) - self.share_inputs.update({ - "reasoning_index": - paddle.full(shape=[args.max_num_seqs, 1], - fill_value=0, - dtype="int32") - }) - - def init_rotary_position_embedding(self, max_model_len: int) -> None: - """ - Init rotary position embedding - """ - pass - - def _init_kvcache(self): - """ - Init kv cache - """ - cache_kvs = {} - total_block_num = self.num_gpu_blocks - num_layers = self.model_cfg.get("num_layers", - None) or self.model_cfg.get( - "num_hidden_layers", None) - - kv_num_head = self.model_cfg.get( - "num_key_value_heads", - self.model_cfg.num_attention_heads, - ) - kv_num_head = kv_num_head // self.tensor_parallel_degree - self.model_cfg.kv_num_head = kv_num_head - - for i in range(num_layers): - cache_type = self.args.dtype - cache_kvs["key_caches_{}".format(i)] = paddle.full( - shape=[ - total_block_num, - kv_num_head, - self.args.block_size, - self.model_cfg.head_dim, - ], - fill_value=0, - dtype=cache_type, - ) - cache_kvs["value_caches_{}".format(i)] = paddle.full( - shape=[ - total_block_num, - kv_num_head, - self.args.block_size, - self.model_cfg.head_dim, - ], - fill_value=0, - dtype=cache_type, - ) - - self.share_inputs["caches"] = list(cache_kvs.values()) - for value in cache_kvs.values(): - del value - paddle.device.cuda.empty_cache() - - def clear_parameters(self, pid: int) -> None: - """ clear_parameters """ - if "caches" in self.share_inputs: - self.model.clear_parameters(pid) - del self.share_inputs["caches"] - paddle.device.cuda.empty_cache() - self.model.log_memory_usage("clear all memory") - - def update_parameters(self, pid: int) -> None: - """ update_parameters """ - if "caches" not in self.share_inputs: - self.model.update_parameters(pid) - self._init_kvcache() - self.model.log_memory_usage("update all memory") - - @paddle.no_grad() - def set_state_dict(self, args: argparse.Namespace) -> None: - """set_state_dict""" - if not self.is_safetensors_model: - rank_model_paths = [] - for root, dirs, files in os.walk(self.args.llm_model_name_or_path): - for file in files: - if file == f"model_state.tp0{self.tensor_parallel_rank}.pdparams": - rank_model_paths.append(os.path.join(root, file)) - elif file == "model_state.pdparams": - rank_model_paths.append(os.path.join(root, file)) - state_dict = {} - for path in rank_model_paths: - loaded_dict = paddle.load(path, return_numpy=True) - state_dict.update(loaded_dict) - - resampler_state = {} - for key in list(state_dict.keys()): - if "vision" in key: - state_dict.pop(key) - if key.startswith("ernie.resampler_model."): - value = state_dict.pop(key) - value = paddle.to_tensor(value).cast("bfloat16") - value = value.numpy() - resampler_state[ - key[len("ernie.resampler_model."):]] = value - elif key.startswith("resampler_model."): - value = state_dict.pop(key) - value = paddle.to_tensor(value).cast("bfloat16") - value = value.numpy() - resampler_state[key[len("resampler_model."):]] = value - self.model.set_state_dict(state_dict) - self.resampler_model.set_state_dict(resampler_state) - else: - state_dict = load_tp_checkpoint( - args.model_name_or_path, - Ernie4_5_PretrainedModel, - self.model_cfg, - return_numpy=True, - ) - for key in list(state_dict.keys()): - if key.startswith("vision_model.") or key.startswith( - "ernie.resampler_model."): - state_dict.pop(key) - self.model.set_state_dict(state_dict) - - @paddle.no_grad() - def vit_load( - self, - model_path: str, - tensor_parallel_degree: int, - tensor_parallel_rank: int, - ) -> None: - """ - Load vit tp weight - """ - if tensor_parallel_degree == 1: - rank_model_path = os.path.join(model_path, "model_state.pdparams") - else: - rank_model_path = os.path.join( - model_path, f"model_state_tp0{tensor_parallel_rank}.pdparams") - if os.path.exists(rank_model_path): - return paddle.load(rank_model_path, return_numpy=True) - else: - raise ValueError(f"No such a file {rank_model_path}") - - @paddle.no_grad() - def inject_pp_vision_model(self, args: argparse.Namespace, cfg: Ernie4_5_VLMoeConfig): - """ - Inject pp vision model - """ - - def set_vision_state_dict(model, - tensor_parallel_degree: int=8, - tensor_parallel_rank: int=0, - name: str=""): - """ - Set vision model weight - """ - model_state_dict = model.state_dict() - compat_keys = [name + k for k in model_state_dict.keys()] - model_files = set() - for k in compat_keys: - if k in self.weight_map.keys(): - model_files.add( - os.path.join(args.model_name_or_path, - self.weight_map[k])) - state_dict = {} - for model_file in model_files: - with safe_open(model_file, framework="np") as f: - for k in f.keys(): - if k in compat_keys: - new_k = k.replace(name, "") - tensor = f.get_tensor(k) - if tensor_parallel_degree > 1: - if "resampler_model" in name and new_k == "spatial_linear.0.weight": - tensor = np.split( - tensor, tensor_parallel_degree, - axis=0)[tensor_parallel_rank] - elif name == "vision_model.": - if "attn.proj.weight" in new_k or "fc2.weight" in new_k: - tensor = np.split( - tensor, - tensor_parallel_degree, - axis=0)[tensor_parallel_rank] - elif "fc1.weight" in new_k or "fc1.bias" in new_k: - tensor = np.split( - tensor, - tensor_parallel_degree, - axis=-1)[tensor_parallel_rank] - elif "qkv.weight" in new_k: - head_dim = self.vision_config.hidden_size // self.vision_config.num_heads - tensor = tensor.reshape([ - self.vision_config.hidden_size, 3, - self.vision_config.num_heads, - head_dim - ]) - tensor = np.split( - tensor, - tensor_parallel_degree, - axis=-2 - )[tensor_parallel_rank].reshape([ - self.vision_config.hidden_size, -1 - ]) - elif "qkv.bias" in new_k: - head_dim = self.vision_config.hidden_size // self.vision_config.num_heads - tensor = tensor.reshape([ - 3, self.vision_config.num_heads, - head_dim - ]) - tensor = np.split( - tensor, - tensor_parallel_degree, - axis=-2 - )[tensor_parallel_rank].reshape([-1]) - state_dict[new_k] = tensor - model.set_state_dict(state_dict) - - vision_model = DFNRopeVisionTransformerPretrainedModel( - cfg.vision_config) - vision_model = paddle.amp.decorate(models=vision_model, - level="O2", - dtype="bfloat16") - vision_model.eval() - if not self.is_safetensors_model: - vit_state_dict = self.vit_load(args.vision_model_name_or_path, - self.tensor_parallel_degree, - self.tensor_parallel_rank) - vision_model.set_state_dict(vit_state_dict) - else: - set_vision_state_dict( - vision_model, - tensor_parallel_degree=self.tensor_parallel_degree, - tensor_parallel_rank=self.tensor_parallel_rank, - name="vision_model.", - ) - - resampler_model = VariableResolutionResamplerModel( - cfg.pixel_hidden_size, - cfg.hidden_size, - cfg.spatial_conv_size, - cfg.temporal_conv_size, - config=cfg, - ) - resampler_model = paddle.amp.decorate(models=resampler_model, - level="O2", - dtype="bfloat16") - resampler_model.eval() - if self.is_safetensors_model: - is_ernie_begin = False - for k in self.weight_map.keys(): - if k.startswith("ernie.resampler_model."): - is_ernie_begin = True - set_vision_state_dict( - resampler_model, - tensor_parallel_degree=self.tensor_parallel_degree, - tensor_parallel_rank=self.tensor_parallel_rank, - name="ernie.resampler_model." - if is_ernie_begin else "resampler_model.", - ) - return vision_model, resampler_model - - @paddle.no_grad() - def extract_vision_features(self, inputs: list[paddle.Tensor]) -> paddle.Tensor: - """extract_vision_features""" - assert inputs["images"] is not None - grid_thw = inputs["grid_thw"] - - images = inputs["images"].cast("float32") - images = self.image_preprocess.rescale_factor * images - self.image_preprocess.image_mean_tensor - images = images / self.image_preprocess.image_std_tensor - images = images.cast("bfloat16") - - token_type_ids = inputs["token_type_ids"] - token_type_ids_w_video = token_type_ids - input_ids = inputs["input_ids"] - # convert to img patch id - image_mask = input_ids == self.model_cfg.im_patch_id - image_type_ids = inputs["image_type_ids"] - with paddle.amp.auto_cast( - True, - custom_black_list=self.amp_black, - custom_white_list=self.amp_white, - level="O2", - dtype=self.dtype, - ): - image_features = self.vision_model.extract_feature( - images, grid_thw) - if self.tensor_parallel_degree > 1: - S, C = image_features.shape - image_features = image_features.reshape( - [-1, C * self.model_cfg.spatial_conv_size**2]) - image_features = ScatterOp.apply(image_features, - axis=-1) # mp 切 Fea - image_features = image_features.reshape([S, -1]) - image_features = self.resampler_model( - image_features, - image_mask, - token_type_ids_w_video, - image_type_ids, - grid_thw, - ) - return image_features - - @paddle.no_grad() - def prepare_rope3d(self, position_ids: paddle.Tensor, **kwargs) -> paddle.Tensor: - """prepare_rope3d""" - - prefix_max_position_ids = paddle.max(position_ids) + 1 - dec_pos_ids = paddle.tile( - paddle.arange(kwargs["max_length"], - dtype="int64").unsqueeze(0).unsqueeze(-1), [1, 1, 3]) - dec_pos_ids = dec_pos_ids + prefix_max_position_ids - position_ids_3d_real = paddle.concat([position_ids, dec_pos_ids], - axis=1) - - rope_emb = get_rope_3d( - position_ids=position_ids_3d_real, - rotary_dim=self.model_cfg.head_dim, - paritial_rotary_factor=1.0, - base=self.model_cfg.rope_theta, - max_position=self.args.max_model_len, - freq_allocation=self.model_cfg.freq_allocation, - ) - return rope_emb - - def prefill_finished(self): - """ - Verify prefill operation completion - """ - prefill_statue = (self.share_inputs["seq_lens_this_time"] != 0) & ( - self.share_inputs["seq_lens_this_time"] != 1) - return not paddle.any(prefill_statue).numpy() - - def dy_input_preprocess(self, tasks: list[any]) -> None: - """ - dynamic insertion - """ - - def get_numeric_value(task, key, default_value): - if task.get(key, None) is not None: - return task.get(key) - else: - return default_value - - for i in range(len(tasks)): - task = tasks[i] - idx = task.idx - - kwargs = { - "max_length": - get_numeric_value(task, "max_tokens", 2048), - "top_p": - get_numeric_value(task, "top_p", 0.8), - "temperature": - get_numeric_value(task, "temperature", 0.2), - "top_k": - get_numeric_value(task, "top_k", 0), - "penalty_score": - get_numeric_value(task, "repetition_penalty", 1.0), - "frequency_score": - get_numeric_value(task, "frequency_penalty", 0.0), - "presence_score": - get_numeric_value(task, "presence_penalty", 0.0), - "decode_strategy": - "sampling", - "pad_token_id": - self.args.pad_token_id, - "enable_thinking": - get_numeric_value(task, "enable_thinking", True), - "reasoning_max_tokens": - get_numeric_value(task, "reasoning_max_tokens", 2048), - } - - if self.args.enable_chunked_prefill: - task.set("chunk_idx", 1) - inputs = self._preprocess_task(task.prefill_chunk_info[0]) - if inputs.get("images") is not None: - self.share_inputs[ - "image_features"] = self.extract_vision_features( - inputs) - else: - # Compatible with the situation that lacks images and videos - self.share_inputs["image_features"] = None - if task.multimodal_inputs["position_ids"] is not None: - position_ids = paddle.to_tensor( - task.multimodal_inputs["position_ids"], - dtype="int64").unsqueeze([0]) - else: - position_ids = None - - token_chunk_size = inputs["input_ids"].shape[1] - task.set("start_idx", token_chunk_size) - self.share_inputs["input_ids"][ - idx:idx + 1, :token_chunk_size] = inputs["input_ids"] - self.share_inputs["seq_lens_this_time"][idx:idx + - 1] = token_chunk_size - self.share_inputs["seq_lens_encoder"][idx:idx + - 1] = token_chunk_size - self.share_inputs["step_seq_lens_encoder"][ - idx:idx + 1] = token_chunk_size - else: - inputs = self._preprocess_task(task.multimodal_inputs) - if inputs.get("images") is not None: - self.share_inputs[ - "image_features"] = self.extract_vision_features( - inputs) - else: - # Compatible with the situation that lacks images and videos - self.share_inputs["image_features"] = None - position_ids = inputs["position_ids"] - - length = inputs["input_ids"].shape[1] - self.share_inputs["input_ids"][ - idx:idx + 1, :length] = inputs["input_ids"] - self.share_inputs["seq_lens_this_time"][idx:idx + 1] = length - self.share_inputs["seq_lens_encoder"][idx:idx + 1] = length - self.share_inputs["step_seq_lens_encoder"][idx:idx + - 1] = length - - # force - self.share_inputs["enable_thinking"][:] = kwargs["enable_thinking"] - self.share_inputs["need_think_end"][ - idx:idx + 1, :] = 1 if kwargs["enable_thinking"] else 0 - - self.share_inputs["reasoning_index"][ - idx:idx + 1, :] = kwargs["reasoning_max_tokens"] - - self.share_inputs["rope_emb"][idx:idx + - 1, :] = self.prepare_rope3d( - position_ids, **kwargs) - - self.share_inputs["top_p"][idx:idx + 1] = kwargs["top_p"] - self.share_inputs["temperature"][idx:idx + - 1] = kwargs["temperature"] - self.share_inputs["eos_token_id"][:] = np.array( - task.eos_token_ids).astype("int64").reshape(-1, 1) - self.share_inputs["penalty_score"][idx:idx + - 1] = kwargs["penalty_score"] - self.share_inputs["frequency_score"][idx:idx + - 1] = kwargs["frequency_score"] - self.share_inputs["presence_score"][idx:idx + - 1] = kwargs["presence_score"] - self.share_inputs["seq_lens_decoder"][idx:idx + 1] = 0 - self.share_inputs["step_idx"][idx:idx + 1] = 0 - self.share_inputs["min_dec_len"][idx:idx + 1] = 1 - self.share_inputs["max_dec_len"][idx:idx + - 1] = kwargs["max_length"] - self.share_inputs["stop_flags"][idx:idx + 1] = False - self.share_inputs["pre_ids"][idx:idx + 1] = -1 - encoder_block_num = len(task.get("block_tables")) - self.share_inputs["encoder_block_lens"][idx:idx + - 1] = encoder_block_num - self.share_inputs["block_tables"][idx:idx + 1, :] = -1 - self.share_inputs["block_tables"][ - idx:idx + 1, :encoder_block_num] = np.array(task.block_tables, - dtype="int32") - - def pre_process(self) -> None: - """ - pre_process - """ - if current_platform.is_cuda(): - if self.args.speculative_method is not None: - ( - ids_remove_padding, - padding_offset, - cum_offsets, - cu_seqlens_q, - cu_seqlens_k, - ) = speculate_remove_padding( - max_len=self.args.max_model_len, - input_ids=self.share_inputs["input_ids"], - seq_lens_this_time=self.share_inputs["seq_lens_this_time"], - draft_tokens=self.share_inputs["draft_tokens"], - seq_lens_encoder=self.share_inputs["seq_lens_encoder"]) - else: - ( - ids_remove_padding, - padding_offset, - cum_offsets, - cu_seqlens_q, - cu_seqlens_k, - ) = remove_padding( - max_len=self.args.max_model_len, - input_ids=self.share_inputs["input_ids"], - seq_lens_this_time=self.share_inputs["seq_lens_this_time"]) - self.share_inputs["ids_remove_padding"] = ids_remove_padding - self.share_inputs["padding_offset"] = padding_offset - self.share_inputs["cum_offsets"] = cum_offsets - self.share_inputs["cu_seqlens_q"] = cu_seqlens_q - self.share_inputs["cu_seqlens_k"] = cu_seqlens_k - self.share_inputs["decoder_batch_ids"] = paddle.full( - [self.fd_config.parallel_config.max_num_seqs, 1], 0, dtype='int32') - self.share_inputs["decoder_tile_ids_per_batch"] = paddle.full( - [self.fd_config.parallel_config.max_num_seqs, 1], 0, dtype='int32') - # initialize_forward_meta - self.forward_meta = ForwardMeta( - input_ids=self.share_inputs["input_ids"], - ids_remove_padding=self.share_inputs["ids_remove_padding"], - rotary_embs=self.share_inputs["rope_emb"], - attn_backend=self.attn_backend, - decoder_batch_ids=self.share_inputs["decoder_batch_ids"], - decoder_tile_ids_per_batch=self.share_inputs["decoder_tile_ids_per_batch"], - seq_lens_encoder=self.share_inputs["seq_lens_encoder"], - seq_lens_decoder=self.share_inputs["seq_lens_decoder"], - seq_lens_this_time=self.share_inputs["seq_lens_this_time"], - cum_offsets=self.share_inputs["cum_offsets"], - padding_offset=self.share_inputs["padding_offset"], - cu_seqlens_q=self.share_inputs["cu_seqlens_q"], - cu_seqlens_k=self.share_inputs["cu_seqlens_k"], - block_tables=self.share_inputs["block_tables"], - caches=self.share_inputs["caches"] - ) - self.attn_backend.init_attention_metadata(self.forward_meta) - - self.sampling_metadata = SamplingMetadata( - temperature=self.share_inputs["temperature"], - top_p=self.share_inputs["top_p"], - step_idx=self.share_inputs["step_idx"], - pre_token_ids=self.share_inputs["pre_ids"], - frequency_penalties=self.share_inputs["frequency_score"], - presence_penalties=self.share_inputs["presence_score"], - repetition_penalties=self.share_inputs["penalty_score"], - min_dec_lens=self.share_inputs["min_dec_len"], - bad_words_token_ids=self.share_inputs["bad_tokens"], - eos_token_ids=self.share_inputs["eos_token_id"], - ) - - def generate(self) -> None: - """ - generate - """ - self.pre_process() - hiddden_states = self.model(self.share_inputs["ids_remove_padding"], - self.share_inputs["image_features"], - self.forward_meta) - logits = self.model.compute_logits(hiddden_states) - set_value_by_flags_and_idx( - self.share_inputs["pre_ids"], - self.share_inputs["input_ids"], - self.share_inputs["seq_lens_this_time"], - self.share_inputs["seq_lens_encoder"], - self.share_inputs["seq_lens_decoder"], - self.share_inputs["step_idx"], - self.share_inputs["stop_flags"], - ) - # sampler & save_output - next_tokens = self.sampler(logits, self.sampling_metadata) - if self.fd_config.parallel_config.tensor_parallel_degree > 1: - paddle.distributed.broadcast(next_tokens, 0) - self.post_process(next_tokens) - - def post_process(self, next_tokens: paddle.Tensor) -> None: - """ - post_process - """ - if self.share_inputs["enable_thinking"]: - exists_think_end = next_tokens == self.model_cfg.think_end_id - paddle.assign( - paddle.where( - exists_think_end, - self.share_inputs["need_think_end"] - 1, - self.share_inputs["need_think_end"], - ), self.share_inputs["need_think_end"]) - - paddle.assign( - paddle.where( - self.share_inputs["need_think_end"].cast("bool"), - self.share_inputs["reasoning_index"] - 1, - self.share_inputs["reasoning_index"], - ), self.share_inputs["reasoning_index"]) - - stop_wo_think = ( - (next_tokens == self.share_inputs["eos_token_id"]) | - (self.share_inputs["reasoning_index"] == 0)) & ( - self.share_inputs["need_think_end"] > 0) - next_tokens = paddle.where(stop_wo_think, - self.model_cfg.think_end_id, - next_tokens) - paddle.assign( - paddle.where( - stop_wo_think, - self.share_inputs["need_think_end"] - 1, - self.share_inputs["need_think_end"], - ), self.share_inputs["need_think_end"]) - paddle.assign( - paddle.where( - self.share_inputs["stop_flags"], - self.share_inputs["step_idx"], - self.share_inputs["step_idx"] + 1, - ), - self.share_inputs["step_idx"], - ) - length_cond = paddle.greater_equal(self.share_inputs["step_idx"], - self.share_inputs["max_dec_len"]) - paddle.assign( - paddle.logical_or(self.share_inputs["stop_flags"], length_cond), - self.share_inputs["stop_flags"], - ) - - set_stop_value_multi_ends( - next_tokens, - self.share_inputs["stop_flags"], - self.share_inputs["seq_lens_this_time"], - self.share_inputs["eos_token_id"], - self.share_inputs["next_tokens"], - False, - ) # multi ends - # update inputs - with paddle.framework._no_check_dy2st_diff(): - update_inputs( - self.share_inputs["stop_flags"], - self.share_inputs["not_need_stop"], - self.share_inputs["seq_lens_this_time"], - self.share_inputs["seq_lens_encoder"], - self.share_inputs["seq_lens_decoder"], - self.share_inputs["input_ids"], - self.share_inputs["stop_nums"], - next_tokens, - self.share_inputs["is_block_step"], - ) - save_output( - next_tokens, - self.share_inputs["not_need_stop"], - self.rank, - False, # use_ep - ) - - def _cal_theortical_kvcache(self): - """ - Calculate the size of kvcache for computational theory - """ - num_layers = self.model_cfg.get("num_layers", - None) or self.model_cfg.get( - "num_hidden_layers", None) - byte_of_cache = 2 - # support c8 c4 - - hidden_dim = self.model_cfg.head_dim * self.model_cfg.kv_num_head - theoretical_kv_cache_memory = (2 * byte_of_cache * - self.args.block_size * num_layers * - hidden_dim) - return theoretical_kv_cache_memory - - def _update_share_input_block_num(self): - """ - Update share_inputs['block_tables'] and share_inputs['free_list'] - """ - num_gpu_blocks = self.num_gpu_blocks - - del self.share_inputs["caches"] - self._init_kvcache() - - del self.share_inputs["block_tables"] - self.share_inputs["block_tables"] = paddle.full( - [self.args.max_num_seqs, num_gpu_blocks], -1, dtype="int32") - - # Init free list - free_list = list( - range(num_gpu_blocks - 1, - int(num_gpu_blocks * self.args.kv_cache_ratio) - 1, -1)) - self.free_list_len = len(free_list) - self.share_inputs.update({ - "free_list": - paddle.to_tensor(free_list, dtype="int32"), - "free_list_len": - paddle.full([1], self.free_list_len, dtype="int32"), - }) - - def dummy_input(self, num_total_tokens: int, number_of_tasks: int) -> None: - """ - fake input to profile - """ - input_length = min(num_total_tokens // number_of_tasks, - self.args.max_model_len - 10) - block_num = (input_length + self.args.block_size - 1 ) // self.args.block_size \ - + self.args.enc_dec_block_num - self.share_inputs["free_list"] = paddle.to_tensor([], dtype="int32") - self.share_inputs["free_list_len"][0] = 0 - - for i in range(number_of_tasks): - idx = i - self.share_inputs["input_ids"][idx:idx + - 1, :input_length] = np.array( - [5] * input_length) - self.share_inputs["eos_token_id"][:] = np.array( - [2], dtype="int64").reshape(-1, 1) - self.share_inputs["seq_lens_this_time"][idx:idx + 1] = input_length - self.share_inputs["step_seq_lens_encoder"][idx:idx + - 1] = input_length - self.share_inputs["seq_lens_encoder"][idx:idx + 1] = input_length - self.share_inputs["seq_lens_decoder"][idx:idx + 1] = 0 - self.share_inputs["step_idx"][idx:idx + 1] = 0 - self.share_inputs["max_dec_len"][idx:idx + 1] = 10 - self.share_inputs["stop_flags"][idx:idx + 1] = False - - self.share_inputs["first_token_ids"][ - idx:idx + 1] = self.share_inputs["input_ids"][idx:idx + 1, :1] - self.share_inputs["ori_seq_lens_encoder"][idx:idx + - 1] = input_length - - self.share_inputs["infer_seed"][idx:idx + 1] = random.randint( - 0, 922337203685477580) - self.share_inputs["encoder_block_lens"][idx:idx + 1] = block_num - self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(idx * block_num, \ - (idx + 1) * block_num, 1) - - def _preprocess_task(self, one: dict) -> None: - """process batch""" - - input_ids = one["input_ids"][np.newaxis, :] - input_ids = paddle.to_tensor(input_ids, dtype=paddle.int64) - token_type_ids = one["token_type_ids"][np.newaxis, :] - token_type_ids = paddle.to_tensor(token_type_ids, dtype=paddle.int64) - - if one["images"] is not None: - image_type_ids = one["image_type_ids"][np.newaxis, :] - images = one["images"] - image_type_ids = paddle.to_tensor(image_type_ids, - dtype=paddle.int64) - images = paddle.to_tensor(images, dtype="uint8") - grid_thw = paddle.to_tensor(one["grid_thw"], dtype="int64") - else: - image_type_ids = None - images = None - grid_thw = None - - if one["position_ids"] is not None: - position_ids = paddle.to_tensor(one["position_ids"], - dtype="int64").unsqueeze([0]) - else: - position_ids = None - - result = dict( - input_ids=input_ids, - image_type_ids=image_type_ids, - token_type_ids=token_type_ids, - position_ids=position_ids, - grid_thw=grid_thw, - images=images, - ) - return result - - -def build_stream_line_model( - model_path: str, - dtype: str, - block_size: int, - max_model_len: int, - tokenizer: ErnieBotTokenizer, - quantization: str = "None", - graph_opt_config: Optional[GraphOptimizationConfig] = None -) -> tuple[FDConfig, paddle.nn.layer]: - """ - build model - """ - import contextlib - - from paddleformers.transformers.configuration_utils import PretrainedConfig - from paddleformers.trl import llm_utils - from paddleformers.utils.log import logger - - from fastdeploy.model_executor.layers.quantization import \ - get_quantization_config - from fastdeploy.model_executor.models.model_base import ModelRegistry - - config, _ = PretrainedConfig.get_config_dict(model_path) - config["head_dim"] = config.get( - "head_dim", config["hidden_size"] // config["num_attention_heads"]) - config["rope_theta"] = config.get("rope_theta", 10000.0) - rope_theta = config["rope_theta"] - model_config = ModelConfig.from_dict(config) - model_config.head_dim = config["head_dim"] - - parallel_config = ParallelConfig() - speculative_config = SpeculativeConfig() - device_config = DeviceConfig() - load_config = LoadConfig() - moe_config = MoEConfig() - kv_cache_config = KVCacheConfig() - kv_cache_config.cache_quant_dtype = "none" - - tensor_parallel_rank, tensor_parallel_degree = llm_utils.init_dist_env() - parallel_config.tensor_parallel_rank = tensor_parallel_rank - parallel_config.tensor_parallel_degree = tensor_parallel_degree - parallel_config.tensor_parallel_degree = tensor_parallel_degree - parallel_config.expert_parallel_degree = 1 - parallel_config.expert_parallel_rank = int(tensor_parallel_rank / - tensor_parallel_degree) - parallel_config.column_cut = False - - speculative_config.is_mtp = False - speculative_config.draft_type = "None" - - # Note(tangbinhan): used for load_checkpoint - model_config.tensor_parallel_rank = parallel_config.tensor_parallel_rank - model_config.tensor_parallel_degree = parallel_config.tensor_parallel_degree - model_config.is_mtp = speculative_config.is_mtp - moe_config.num_experts = None - - # use the length of tokenizer as the origin vocab size - ori_vocab_size = len(tokenizer) - moe_intermediate_size = (config.get("moe_intermediate_size", None), ) - if isinstance(moe_intermediate_size, list) or isinstance( - moe_intermediate_size, tuple): - moe_intermediate_size = moe_intermediate_size[0] - - num_key_value_heads = config.get("num_key_value_heads", -1) - if num_key_value_heads is None: - num_key_value_heads = -1 - - # RL need, some model num_key_value_heads less tensor_parallel_degree, need copy - if num_key_value_heads < tensor_parallel_degree: - logger.warning( - f"key value heads num is {num_key_value_heads}, tensor parallel degree is {tensor_parallel_degree}" - ) - num_key_value_heads = tensor_parallel_degree - - if config.get("ffn_hidden_size", None) is not None: - ffn_hidden_size = config["ffn_hidden_size"] - elif config.get("intermediate_size", None) is not None: - ffn_hidden_size = config["intermediate_size"] - else: - ffn_hidden_size = 4 * config["hidden_size"] - if config["hidden_act"].lower() == "swiglu": - if paddle.distributed.get_world_size() > 1: - multiple_of = 8 * config["num_attention_heads"] - else: - multiple_of = 4 * config["num_attention_heads"] - ffn_hidden_size = multiple_of * ( - (int(2 * ffn_hidden_size / 3) + multiple_of - 1) // - multiple_of) - - num_layers = config.get("num_layers", None) or config.get( - "num_hidden_layers", None) - if num_layers is None: - raise ValueError(f"num_layers<{num_layers}> is invalid") - - remove_tail_layer = config.get("remove_tail_layer") - if remove_tail_layer is True: - num_layers -= 1 - elif isinstance(remove_tail_layer, int): - num_layers -= remove_tail_layer - - moe_num_experts = config.get("moe_num_experts", 0) - if isinstance(moe_num_experts, list): - moe_num_experts = max(moe_num_experts) - use_moe = moe_num_experts > 0 - - context = contextlib.nullcontext() - - if config["hidden_act"].lower() == "swiglu": - model_config.hidden_act = "swiglu" - model_config.ffn_hidden_size = ffn_hidden_size - model_config.max_seq_len = max_model_len - model_config.num_layers = num_layers - model_config.dtype = dtype - parallel_config.block_size = block_size - - parallel_config.msg_queue_id = None - model_config.num_key_value_heads = num_key_value_heads - model_config.return_all_hidden_states = False - speculative_config.draft_type = "None" - model_config.start_layer_index = 0 - if use_moe: - moe_config.num_experts = config.get("moe_num_experts", None) - moe_config.moe_intermediate_size = config.get("moe_intermediate_size", - None) - moe_config.top_k = config.get("moe_topk", 8) - moe_config.moe_num_shared_experts = config.get( - "moe_num_shared_experts", 0) - moe_config.moe_layer_start_index = config.get("moe_layer_start_index", - None) - moe_config.moe_layer_end_index = config.get("moe_layer_end_index", - None) - - model_config.moe_phase = MoEPhase(phase="prefill") - model_config.ori_vocab_size = ori_vocab_size - - quantization_config = config.get("quantization_config", None) - - quant_config_name = None - if quantization_config is not None and quantization_config.get( - "quantization", None) is None: - raise ValueError( - "quantization_config should have a key named 'quantization' for specify quant config." - ) - - if quantization_config is not None: - quant_config_name = quantization_config["quantization"] - quant_cls = get_quantization_config(quant_config_name) - quant_config = quant_cls.from_config(quantization_config) - elif quantization != "None": - quantization_config = {} - if use_moe and quantization == "wint4": - quantization_config["dense_quant_type"] = "wint8" - quantization_config["moe_quant_type"] = "wint4" - quant_config_name = "mix_quant" - else: - quant_config_name = quantization - quant_cls = get_quantization_config(quant_config_name) - quant_config = quant_cls.from_config(quantization_config) - else: - quant_config = None - - logger.info("===========quantization_config==============") - if quant_config is not None: - logger.info(f"{quantization_config}") - else: - logger.info( - "No quantization config found and use original weight and act dtype." - ) - logger.info("============================================") - - fd_config = FDConfig( - model_config=model_config, - parallel_config=parallel_config, - speculative_config=speculative_config, - device_config=device_config, - load_config=load_config, - moe_config=moe_config, - quant_config=quant_config, - kv_cache_config=kv_cache_config, - graph_opt_config=graph_opt_config, - ) - fd_config.parallel_config.max_model_len = max_model_len - fd_config.model_config.rope_theta = rope_theta - - with context: - model_cls = ModelRegistry.get_class(model_config.architectures[0]) - model = model_cls(fd_config) - - model.eval() - return fd_config, model From e414faed2ba969f0cc4b9a8587bfb1e056331d23 Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Thu, 24 Jul 2025 16:23:24 +0800 Subject: [PATCH 07/10] update --- fastdeploy/model_executor/layers/moe/ep.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index 3c196306e2..e2e87beb2c 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -194,7 +194,21 @@ def low_latency_combine( Return: combined_hidden_states: [num_tokens, hidden] """ - + # TODO(@wufeisheng): Delete them when deepep in PaddlePaddle is fixed + ( + src_info, + layout_range, + num_max_dispatch_tokens_per_rank, + num_experts, + ) = handle + handle = ( + src_info, + layout_range, + num_max_dispatch_tokens_per_rank, + None, + num_experts, + ) + combined_hidden_states, _, combine_hook = self.decode_deepep_engine.low_latency_combine( hidden_states, topk_idx, From 84afd9961a11357924c31d6d9ad358bfbc6445e0 Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Thu, 24 Jul 2025 16:26:54 +0800 Subject: [PATCH 08/10] fix typo --- fastdeploy/worker/worker_process.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 2fb52926f6..32373b3083 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -648,15 +648,6 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: if getattr(model_config, "num_hidden_layers", None) is None: raise ValueError("num_hidden_layers is None") - # Set MoE phase based on splitwise role - if parallel_config.splitwise_role == "mixed": - parallel_config.moe_phase.phase = "prefill" - elif parallel_config.splitwise_role == "prefill": - parallel_config.moe_phase = "prefill" - elif parallel_config.splitwise_role == "decode": - parallel_config.moe_phase = "decode" - elif parallel_config.splitwise_role is not None: - raise NotImplementedError quantization_config = model_config.quantization_config if not model_config.is_quantized: From 7299c1efdcf69e6e560143abaaa68fd716c506de Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Thu, 24 Jul 2025 16:36:43 +0800 Subject: [PATCH 09/10] fix code style --- fastdeploy/config.py | 2 +- fastdeploy/model_executor/layers/moe/ep.py | 9 +++-- .../layers/moe/fused_moe_backend_base.py | 36 ++++++++++++------- fastdeploy/worker/gpu_model_runner.py | 6 ++-- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index f610239665..dd8a05a008 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -18,7 +18,6 @@ import os from dataclasses import dataclass, field -from enum import Enum from typing import Literal, Optional from paddleformers.transformers.configuration_utils import PretrainedConfig @@ -34,6 +33,7 @@ class MoEPhase: """ The generation phase of the moe. """ + def __init__(self, phase="prefill"): self._phase = phase diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index e2e87beb2c..d48c01538c 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -74,7 +74,7 @@ def __init__( self.ep_config = Config(24, 6, 256) self.num_max_dispatch_tokens_per_rank = num_max_dispatch_tokens_per_rank - # In mixed EP mode on a single node, we dynamically switch between + # In mixed EP mode on a single node, we dynamically switch between # high throughput and low latency modes. if splitwise_role == "mixed": # decode engine @@ -88,7 +88,7 @@ def __init__( low_latency_mode=False, num_qps_per_rank=1, ) - # In disaggregated mode on mutiple nodes, we either use + # In disaggregated mode on mutiple nodes, we either use # high throughput mode or low latency mode. else: if moe_phase.phase == "decode": @@ -105,7 +105,6 @@ def __init__( else: raise ValueError(f"Unknown generation phase {moe_phase}") - def get_low_latency_buffer(self): """ Get the DeepEP buffer. @@ -194,7 +193,7 @@ def low_latency_combine( Return: combined_hidden_states: [num_tokens, hidden] """ - # TODO(@wufeisheng): Delete them when deepep in PaddlePaddle is fixed + # TODO(@wufeisheng): Delete them when deepep in PaddlePaddle is fixed ( src_info, layout_range, @@ -208,7 +207,7 @@ def low_latency_combine( None, num_experts, ) - + combined_hidden_states, _, combine_hook = self.decode_deepep_engine.low_latency_combine( hidden_states, topk_idx, diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index 8a2a83ad17..f9ea275d79 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -19,8 +19,6 @@ import paddle from paddle import nn -from fastdeploy.config import MoEPhase - from ..quantization.quant_base import QuantMethodBase @@ -46,11 +44,16 @@ def init_ep(self, layer: nn.Layer) -> None: """ if layer.ep_size > 1: if layer.fd_config.parallel_config.splitwise_role == "mixed": - from .ep import EPPrefillRunner, EPDecoderRunner + from .ep import EPDecoderRunner, EPPrefillRunner + self.ep_prefill_runner = EPPrefillRunner( - layer.top_k, layer.hidden_size, layer.num_experts, + layer.top_k, + layer.hidden_size, + layer.num_experts, layer.fd_config.parallel_config.splitwise_role, - layer.ep_size, layer.ep_rank) + layer.ep_size, + layer.ep_rank, + ) self.ep_decoder_runner = EPDecoderRunner( layer.top_k, layer.hidden_size, @@ -64,17 +67,27 @@ def init_ep(self, layer: nn.Layer) -> None: else: if layer.fd_config.parallel_config.moe_phase == "prefill": from .ep import EPPrefillRunner + self.ep_prefill_runner = EPPrefillRunner( - layer.top_k, layer.hidden_size, layer.num_experts, + layer.top_k, + layer.hidden_size, + layer.num_experts, layer.fd_config.parallel_config.splitwise_role, - layer.ep_size, layer.ep_rank) + layer.ep_size, + layer.ep_rank, + ) else: from .ep import EPDecoderRunner + self.ep_decoder_runner = EPDecoderRunner( - layer.top_k, layer.hidden_size, layer.num_experts, + layer.top_k, + layer.hidden_size, + layer.num_experts, layer.moe_config.num_max_dispatch_tokens_per_rank, layer.fd_config.parallel_config.splitwise_role, - layer.ep_size, layer.ep_rank) + layer.ep_size, + layer.ep_rank, + ) def process_loaded_weights(self, layer, weights) -> None: """ @@ -150,10 +163,7 @@ def apply( if layer.ep_size > 1: if layer.fd_config.parallel_config.moe_phase.phase == "prefill": return self.apply_ep_prefill(layer, x, gate_out) - elif layer.fd_config.parallel_config.moe_phase.phase == "decode": - return self.apply_ep_decode(layer, x, gate_out) else: - logger.error( - f"invalid value of moe_phase={layer.fd_config.parallel_config.moe_phase.phase}") + return self.apply_ep_decode(layer, x, gate_out) else: return self.apply_tp(layer, x, gate_out) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 996d1ff04f..ca73961408 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -794,16 +794,16 @@ def initialize_forward_meta(self): # Update Batch type for cuda graph # TODO(gongshaotian): Use seq_lens_encoder to set is_decode_batch is_decode_batch = not ((self.share_inputs["seq_lens_this_time"] > 1).sum() > 0) - + # mix ep in single node if self.fd_config.parallel_config.use_ep and self.fd_config.parallel_config.splitwise_role == "mixed": is_decode_batch_list = [] paddle.distributed.all_gather_object(is_decode_batch_list, is_decode_batch) is_decode_batch = all(is_decode_batch_list) self.fd_config.parallel_config.moe_phase.phase = "decode" if is_decode_batch else "prefill" - + self.forward_meta.step_use_cudagraph = self.use_cudagraph and is_decode_batch - + # Initialzie attention meta data for attn_backend in self.attn_backends: attn_backend.init_attention_metadata(self.forward_meta) From c81a3f91c5ab2dce7baae37db3a53222a73bef62 Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Thu, 24 Jul 2025 23:11:21 +0800 Subject: [PATCH 10/10] fix conflict --- fastdeploy/model_executor/layers/moe/ep.py | 5 ++--- .../model_executor/layers/moe/fused_moe_backend_base.py | 3 +++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index d48c01538c..b1ab322a1e 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -83,7 +83,7 @@ def __init__( # prefill engine self.prefill_deepep_engine = deep_ep.Buffer( self.group, - int(1e9), + int(5e8), 0, low_latency_mode=False, num_qps_per_rank=1, @@ -97,7 +97,7 @@ def __init__( elif moe_phase.phase == "prefill": self.prefill_deepep_engine = deep_ep.Buffer( self.group, - int(1e9), + int(5e8), 0, low_latency_mode=False, num_qps_per_rank=1, @@ -261,7 +261,6 @@ def __init__( num_max_dispatch_tokens_per_rank=num_max_dispatch_tokens_per_rank, hidden=hidden, num_experts=num_experts + redundant_experts_num, - moe_phase=moe_phase, ep_size=ep_size, ep_rank=ep_rank, splitwise_role=splitwise_role, diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index f9ea275d79..0f6b38586c 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -53,6 +53,7 @@ def init_ep(self, layer: nn.Layer) -> None: layer.fd_config.parallel_config.splitwise_role, layer.ep_size, layer.ep_rank, + layer.fd_config.model_config.redundant_experts_num, ) self.ep_decoder_runner = EPDecoderRunner( layer.top_k, @@ -75,6 +76,7 @@ def init_ep(self, layer: nn.Layer) -> None: layer.fd_config.parallel_config.splitwise_role, layer.ep_size, layer.ep_rank, + layer.fd_config.model_config.redundant_experts_num, ) else: from .ep import EPDecoderRunner @@ -87,6 +89,7 @@ def init_ep(self, layer: nn.Layer) -> None: layer.fd_config.parallel_config.splitwise_role, layer.ep_size, layer.ep_rank, + layer.fd_config.model_config.redundant_experts_num, ) def process_loaded_weights(self, layer, weights) -> None: