Skip to content

Commit 08ae99f

Browse files
committed
Opt:ascend kv separation; dsv3 support graph; fused dequant+swiglu+quant; dp load balance
1 parent 1ec9769 commit 08ae99f

File tree

10 files changed

+445
-96
lines changed

10 files changed

+445
-96
lines changed

python/sglang/srt/disaggregation/ascend/conn.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
1+
import concurrent.futures
12
import logging
3+
from typing import List, Tuple
4+
5+
import numpy as np
6+
import numpy.typing as npt
27

38
from sglang.srt.disaggregation.ascend.transfer_engine import AscendTransferEngine
9+
from sglang.srt.disaggregation.common.utils import group_concurrent_contiguous
410
from sglang.srt.disaggregation.mooncake.conn import (
511
MooncakeKVBootstrapServer,
612
MooncakeKVManager,
@@ -29,6 +35,77 @@ def register_buffer_to_engine(self):
2935
self.kv_args.aux_data_ptrs, self.kv_args.aux_data_lens
3036
)
3137

38+
def send_kvcache(
39+
self,
40+
mooncake_session_id: str,
41+
prefill_kv_indices: npt.NDArray[np.int32],
42+
dst_kv_ptrs: list[int],
43+
dst_kv_indices: npt.NDArray[np.int32],
44+
executor: concurrent.futures.ThreadPoolExecutor,
45+
):
46+
# Group by indices
47+
prefill_kv_blocks, dst_kv_blocks = group_concurrent_contiguous(
48+
prefill_kv_indices, dst_kv_indices
49+
)
50+
51+
num_layers = len(self.kv_args.kv_data_ptrs)
52+
layers_params = [
53+
(
54+
self.kv_args.kv_data_ptrs[layer_id],
55+
dst_kv_ptrs[layer_id],
56+
self.kv_args.kv_item_lens[layer_id],
57+
)
58+
for layer_id in range(num_layers)
59+
]
60+
61+
assert layers_params is not None
62+
63+
def set_transfer_blocks(
64+
src_ptr: int, dst_ptr: int, item_len: int
65+
) -> List[Tuple[int, int, int]]:
66+
transfer_blocks = []
67+
for prefill_index, decode_index in zip(prefill_kv_blocks, dst_kv_blocks):
68+
src_addr = src_ptr + int(prefill_index[0]) * item_len
69+
dst_addr = dst_ptr + int(decode_index[0]) * item_len
70+
length = item_len * len(prefill_index)
71+
transfer_blocks.append((src_addr, dst_addr, length))
72+
return transfer_blocks
73+
74+
# Worker function for processing a single layer
75+
def process_layer(src_ptr: int, dst_ptr: int, item_len: int) -> int:
76+
transfer_blocks = set_transfer_blocks(src_ptr, dst_ptr, item_len)
77+
return self._transfer_data(mooncake_session_id, transfer_blocks)
78+
79+
# Worker function for processing all layers in a batch
80+
def process_layers(layers_params: List[Tuple[int, int, int]]) -> int:
81+
transfer_blocks = []
82+
for src_ptr, dst_ptr, item_len in layers_params:
83+
transfer_blocks.extend(set_transfer_blocks(src_ptr, dst_ptr, item_len))
84+
return self._transfer_data(mooncake_session_id, transfer_blocks)
85+
86+
if self.enable_custom_mem_pool:
87+
futures = [
88+
executor.submit(
89+
process_layer,
90+
src_ptr,
91+
dst_ptr,
92+
item_len,
93+
)
94+
for (src_ptr, dst_ptr, item_len) in layers_params
95+
]
96+
for future in concurrent.futures.as_completed(futures):
97+
status = future.result()
98+
if status != 0:
99+
for f in futures:
100+
f.cancel()
101+
return status
102+
else:
103+
# Combining all layers' params in one batch transfer is more efficient
104+
# compared to using multiple threads
105+
return process_layers(layers_params)
106+
107+
return 0
108+
32109

33110
class AscendKVSender(MooncakeKVSender):
34111
pass

python/sglang/srt/disaggregation/utils.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,9 +353,14 @@ def register_disaggregation_server(
353353

354354

355355
def is_mla_backend(target_kv_pool) -> bool:
356-
from sglang.srt.mem_cache.memory_pool import MLATokenToKVPool
356+
from sglang.srt.mem_cache.memory_pool import (
357+
AscendMLAPagedTokenToKVPool,
358+
MLATokenToKVPool,
359+
)
357360

358-
return isinstance(target_kv_pool, MLATokenToKVPool)
361+
return isinstance(target_kv_pool, MLATokenToKVPool) or isinstance(
362+
target_kv_pool, AscendMLAPagedTokenToKVPool
363+
)
359364

360365

361366
def prepare_abort(req: Req, error_message: str, status_code=None):

0 commit comments

Comments
 (0)