Skip to content

Commit c6de8a7

Browse files
committed
fix ci
1 parent 8eda960 commit c6de8a7

File tree

5 files changed

+33
-22
lines changed

5 files changed

+33
-22
lines changed

fastdeploy/engine/common_engine.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,11 @@ def start_worker_queue_service(self):
198198
local_data_parallel_size=self.cfg.parallel_config.data_parallel_size,
199199
)
200200

201-
if self.cfg.cache_config.enable_prefix_caching or self.cfg.splitwise_role != "mixed":
201+
if (
202+
self.cfg.cache_config.enable_prefix_caching
203+
or self.cfg.splitwise_role != "mixed"
204+
and self.cfg.parallel_config.local_data_parallel_id == 0
205+
):
202206
self.cache_task_queue = EngineCacheQueue(
203207
address=(
204208
self.cfg.master_ip,
@@ -726,7 +730,9 @@ def start_cache_service(self, device_ids, ipc_signal_suffix):
726730
tensor_parallel_size=self.cfg.tensor_parallel_size,
727731
device_ids=device_ids,
728732
pod_ip=self.cfg.master_ip,
729-
engine_worker_queue_port=self.cfg.engine_worker_queue_port,
733+
engine_worker_queue_port=int(
734+
self.cfg.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id]
735+
),
730736
pid_suffix=ipc_signal_suffix,
731737
)
732738

fastdeploy/engine/engine.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ def start(self, api_server_pid=None):
109109
start_time = time.time()
110110

111111
self.api_server_pid = api_server_pid
112-
self.engine_pid = os.getpid()
113-
self.ipc_signal_suffix = self.engine_pid if self.api_server_pid is None else self.api_server_pid
112+
self.ipc_signal_suffix = self.cfg.engine_worker_queue_port[0]
114113
self._init_worker_signals()
115114

116115
self.data_processor = self.input_processor.create_processor()
@@ -445,7 +444,7 @@ def _start_worker_service(self):
445444
f" --enc_dec_block_num {self.cfg.cache_config.enc_dec_block_num}"
446445
f" --eos_tokens_lens {self.data_processor.eos_token_id_len}"
447446
f" --pad_token_id {self.data_processor.pad_token_id}"
448-
f" --engine_pid {self.engine_pid}"
447+
f" --engine_pid {self.cfg.engine_worker_queue_port[0]}"
449448
f" --max_num_batched_tokens {self.cfg.max_num_batched_tokens}"
450449
f" --splitwise_role {self.cfg.splitwise_role}"
451450
f" --kv_cache_ratio {self.cfg.cache_config.kv_cache_ratio}"
@@ -600,20 +599,19 @@ def launch_components(self):
600599
self.engine.scheduler.start(role, host_ip, disaggregate)
601600

602601
if not envs.FD_ENABLE_MULTI_API_SERVER:
603-
time.sleep(1)
604602
if self.cfg.parallel_config.enable_expert_parallel and self.cfg.parallel_config.data_parallel_size > 1:
605603
self.dp_processed = []
606604
for i in range(
607605
1,
608606
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
609607
):
610-
time.sleep(1)
608+
time.sleep(3)
611609
self.dp_processed.append(
612610
multiprocessing.Process(
613611
target=start_data_parallel_service,
614612
args=(
615613
self.cfg,
616-
i + self.cfg.node_rank * self.cfg.worker_num_per_node,
614+
i,
617615
),
618616
)
619617
)

fastdeploy/engine/expert_service.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def __init__(self, cfg, local_data_parallel_id):
6262
)
6363
else:
6464
self.cfg.cache_config.pd_comm_port = [self.cfg.cache_config.pd_comm_port[local_data_parallel_id]]
65+
self.cfg.parallel_config.local_data_parallel_id = local_data_parallel_id
6566

6667
self.engine = EngineSevice(self.cfg)
6768
if self.cfg.scheduler_config.name == "splitwise":
@@ -83,7 +84,7 @@ def start(self, ipc_signal_suffix, local_data_parallel_id):
8384
self.api_server_pid = ipc_signal_suffix
8485
self.engine.start_zmq_service(ipc_signal_suffix)
8586
else:
86-
ipc_signal_suffix = os.getpid()
87+
ipc_signal_suffix = self.cfg.engine_worker_queue_port[0]
8788

8889
llm_logger.info(f"start expert service {local_data_parallel_id}")
8990
if self.cfg.splitwise_role != "mixed":

fastdeploy/inter_communicator/ipc_signal.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ def __init__(
7878
name = name + f".{suffix}"
7979

8080
if create:
81-
assert not shared_memory_exists(name), f"ShareMemory: {name} already exists"
81+
if shared_memory_exists(name):
82+
print(f"ShareMemory: {name} already exists, delete it")
83+
SharedMemory(name=name, create=False).unlink()
8284
self.shm = SharedMemory(create=True, size=array.nbytes, name=name)
8385
self.value: np.ndarray = np.ndarray(array.shape, dtype=array.dtype, buffer=self.shm.buf)
8486
self.value[:] = array # Initialize with input array data

fastdeploy/worker/worker_process.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -152,19 +152,7 @@ def __init__(self, fd_config: FDConfig, ranks: int = 1, local_rank: int = 0) ->
152152
# TODO(gongshaotian): Use worker factory to get worker
153153
self.worker = get_worker(fd_config=fd_config, local_rank=self.local_rank, rank=self.ranks)
154154

155-
# Initialize task queue
156-
task_address = (
157-
self.parallel_config.pod_ip,
158-
self.parallel_config.engine_worker_queue_port,
159-
)
160155
self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
161-
self.task_queue = TaskQueue(
162-
address=task_address,
163-
is_server=False,
164-
num_client=self.parallel_config.tensor_parallel_size,
165-
client_id=self.parallel_config.tensor_parallel_rank,
166-
local_data_parallel_id=self.parallel_config.expert_parallel_rank,
167-
)
168156

169157
def init_health_status(self) -> None:
170158
"""
@@ -440,6 +428,20 @@ def init_device(self) -> None:
440428
"""Initialize device and Construct model runner"""
441429
self.worker.init_device()
442430

431+
def start_queue_service(self):
432+
# Initialize task queue
433+
task_address = (
434+
self.parallel_config.pod_ip,
435+
self.parallel_config.engine_worker_queue_port,
436+
)
437+
self.task_queue = TaskQueue(
438+
address=task_address,
439+
is_server=False,
440+
num_client=self.parallel_config.tensor_parallel_size,
441+
client_id=self.parallel_config.tensor_parallel_rank,
442+
local_data_parallel_id=self.parallel_config.expert_parallel_rank,
443+
)
444+
443445
def load_model(self) -> None:
444446
"""Load weights and create model"""
445447

@@ -773,6 +775,8 @@ def run_worker_proc() -> None:
773775
# Initialize health status
774776
worker_proc.init_health_status()
775777

778+
worker_proc.start_queue_service()
779+
776780
# Start event loop
777781
if fd_config.parallel_config.use_ep:
778782
# TODO(wufeisheng): Delete this branch

0 commit comments

Comments
 (0)