Skip to content

Commit c043fcc

Browse files
committed
fix queue ports idx
1 parent 5a47b97 commit c043fcc

File tree

2 files changed

+14
-6
lines changed

2 files changed

+14
-6
lines changed

fastdeploy/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,8 @@ def __init__(
278278
logger.info(f"engine_worker_queue_port type is str: {self.engine_worker_queue_port}")
279279
self.engine_worker_queue_port = [int(port) for port in self.engine_worker_queue_port.split(",")]
280280
logger.info(f"engine_worker_queue_port: {self.engine_worker_queue_port}")
281+
elif isinstance(self.engine_worker_queue_port, int):
282+
self.engine_worker_queue_port = [self.engine_worker_queue_port]
281283
# currently, the expert parallel size is equal data parallel size
282284
self.expert_parallel_size = self.data_parallel_size
283285
self.use_ep = self.expert_parallel_size > 1

fastdeploy/worker/worker_process.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,13 +192,17 @@ def init_health_status(self) -> None:
192192
)
193193
self.worker_ready_signal.value[self.local_rank % self.max_chips_per_node] = 1
194194

195+
if self.parallel_config.local_data_parallel_id == 0:
196+
current_suffix = self.parallel_config.engine_pid
197+
else:
198+
current_suffix = self.parallel_config.engine_worker_queue_port
195199
# init worker_healthy_live_signal
196200
workers_alive = np.zeros(shape=[min(array_size, self.parallel_config.tensor_parallel_size)], dtype=np.int32)
197201
self.worker_healthy_live_signal = IPCSignal(
198202
name="worker_healthy_live_signal",
199203
array=workers_alive,
200204
dtype=np.int32,
201-
suffix=self.parallel_config.engine_worker_queue_port,
205+
suffix=current_suffix,
202206
create=False,
203207
)
204208
local_rank = self.local_rank % self.parallel_config.tensor_parallel_size
@@ -210,7 +214,7 @@ def init_health_status(self) -> None:
210214
name="model_weights_status",
211215
array=workers_model_weights,
212216
dtype=np.int32,
213-
suffix=self.parallel_config.engine_worker_queue_port,
217+
suffix=current_suffix,
214218
create=False,
215219
)
216220

@@ -220,7 +224,7 @@ def init_health_status(self) -> None:
220224
name="exist_task_signal",
221225
array=workers_exist_task,
222226
dtype=np.int32,
223-
suffix=self.parallel_config.engine_worker_queue_port,
227+
suffix=current_suffix,
224228
create=False,
225229
)
226230

@@ -230,7 +234,7 @@ def init_health_status(self) -> None:
230234
name="exist_swapped_task_signal",
231235
array=workers_swapped_task,
232236
dtype=np.int32,
233-
suffix=self.parallel_config.engine_worker_queue_port,
237+
suffix=current_suffix,
234238
create=False,
235239
)
236240

@@ -240,7 +244,7 @@ def init_health_status(self) -> None:
240244
name="exist_prefill_task_signal",
241245
array=exist_prefill_task_signal_data,
242246
dtype=np.int32,
243-
suffix=self.parallel_config.engine_worker_queue_port,
247+
suffix=current_suffix,
244248
create=False,
245249
)
246250

@@ -643,12 +647,14 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
643647

644648
num_experts_per_rank = num_experts // args.expert_parallel_size
645649
num_experts_start_offset = expert_parallel_rank * num_experts_per_rank
650+
max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
651+
parallel_config.local_data_parallel_id = expert_parallel_rank % max_chips_per_node
646652

647653
parallel_config.expert_parallel_rank = expert_parallel_rank
648654
parallel_config.num_experts_per_rank = num_experts_per_rank
649655
parallel_config.num_experts_start_offset = num_experts_start_offset
650656
parallel_config.engine_worker_queue_port = parallel_config.engine_worker_queue_port[
651-
parallel_config.expert_parallel_rank
657+
parallel_config.local_data_parallel_id
652658
]
653659

654660
load_config = LoadConfig(vars(args))

0 commit comments

Comments
 (0)