Skip to content

Commit 518cdfb

Browse files
Spycshpre-commit-ci[bot]chensuyueZePan110
authored
add dynamic batching embedding/reranking (opea-project#774)
* draft static batching embedding/reranking on single gaudi card * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix * resolve segfault, deadlock and other issues * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * narrow down default timeout * add doockerfile * fix hpu local microservice start * openai format * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * configurable timeout * lower timeout * fix * lower default timeout * bf16 * log, pad max_len * autocast, 128 * fix acc issue * perf fallback with no acc drop * revert no-padding ones * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix hpu graph wrapper * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add padding batch * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * habana 1.18 * static -> dynamic * add UT, add param in_single_process * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add docker file * fix case doc empty, and pass model id from env * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * CI --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: chen, suyue <suyue.chen@intel.com> Co-authored-by: ZePan110 <ze.pan@intel.com>
1 parent a8e5adc commit 518cdfb

File tree

5 files changed

+439
-3
lines changed

5 files changed

+439
-3
lines changed

.github/workflows/docker/compose/embeddings-compose-cd.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,7 @@ services:
2222
build:
2323
dockerfile: comps/embeddings/predictionguard/Dockerfile
2424
image: ${REGISTRY:-opea}/embedding-predictionguard:${TAG:-latest}
25+
embedding-reranking-local:
26+
build:
27+
dockerfile: comps/embeddings/tei/langchain/Dockerfile.dynamic_batching
28+
image: ${REGISTRY:-opea}/embedding-reranking-local:${TAG:-latest}

comps/cores/mega/micro_service.py

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,21 @@
33

44
import asyncio
55
import multiprocessing
6+
import os
7+
from collections import defaultdict, deque
8+
from enum import Enum
69
from typing import Any, List, Optional, Type
710

811
from ..proto.docarray import TextDoc
912
from .constants import ServiceRoleType, ServiceType
13+
from .logger import CustomLogger
1014
from .utils import check_ports_availability
1115

1216
opea_microservices = {}
1317

18+
logger = CustomLogger("micro_service")
19+
logflag = os.getenv("LOGFLAG", False)
20+
1421

1522
class MicroService:
1623
"""MicroService class to create a microservice."""
@@ -31,6 +38,9 @@ def __init__(
3138
provider: Optional[str] = None,
3239
provider_endpoint: Optional[str] = None,
3340
use_remote_service: Optional[bool] = False,
41+
dynamic_batching: bool = False,
42+
dynamic_batching_timeout: int = 1,
43+
dynamic_batching_max_batch_size: int = 32,
3444
):
3545
"""Init the microservice."""
3646
self.name = f"{name}/{self.__class__.__name__}" if name else self.__class__.__name__
@@ -43,6 +53,9 @@ def __init__(
4353
self.input_datatype = input_datatype
4454
self.output_datatype = output_datatype
4555
self.use_remote_service = use_remote_service
56+
self.dynamic_batching = dynamic_batching
57+
self.dynamic_batching_timeout = dynamic_batching_timeout
58+
self.dynamic_batching_max_batch_size = dynamic_batching_max_batch_size
4659
self.uvicorn_kwargs = {}
4760

4861
if ssl_keyfile:
@@ -58,10 +71,50 @@ def __init__(
5871

5972
self.server = self._get_server()
6073
self.app = self.server.app
74+
# create a batch request processor loop if using dynamic batching
75+
if self.dynamic_batching:
76+
self.buffer_lock = asyncio.Lock()
77+
self.request_buffer = defaultdict(deque)
78+
79+
@self.app.on_event("startup")
80+
async def startup_event():
81+
asyncio.create_task(self._dynamic_batch_processor())
82+
6183
self.event_loop = asyncio.new_event_loop()
6284
asyncio.set_event_loop(self.event_loop)
6385
self.event_loop.run_until_complete(self._async_setup())
6486

87+
async def _dynamic_batch_processor(self):
88+
if logflag:
89+
logger.info("dynamic batch processor looping...")
90+
while True:
91+
await asyncio.sleep(self.dynamic_batching_timeout)
92+
runtime_batch: dict[Enum, list[dict]] = {} # {ServiceType.Embedding: [{"request": xx, "response": yy}, {}]}
93+
94+
async with self.buffer_lock:
95+
# prepare the runtime batch, access to buffer is locked
96+
if self.request_buffer:
97+
for service_type, request_lst in self.request_buffer.items():
98+
batch = []
99+
# grab min(MAX_BATCH_SIZE, REQUEST_SIZE) requests from buffer
100+
for _ in range(min(self.dynamic_batching_max_batch_size, len(request_lst))):
101+
batch.append(request_lst.popleft())
102+
103+
runtime_batch[service_type] = batch
104+
105+
# Run batched inference on the batch and set results
106+
for service_type, batch in runtime_batch.items():
107+
if not batch:
108+
continue
109+
results = await self.dynamic_batching_infer(service_type, batch)
110+
111+
for req, result in zip(batch, results):
112+
req["response"].set_result(result)
113+
114+
async def dynamic_batching_infer(self, service_type: Enum, batch: list[dict]):
115+
"""Need to implement."""
116+
raise NotImplementedError("Unimplemented dynamic batching inference!")
117+
65118
def _validate_env(self):
66119
"""Check whether to use the microservice locally."""
67120
if self.use_remote_service:
@@ -116,10 +169,14 @@ def run(self):
116169
self._validate_env()
117170
self.event_loop.run_until_complete(self._async_run_forever())
118171

119-
def start(self):
172+
def start(self, in_single_process=False):
120173
self._validate_env()
121-
self.process = multiprocessing.Process(target=self.run, daemon=False, name=self.name)
122-
self.process.start()
174+
if in_single_process:
175+
# Resolve HPU segmentation fault and potential tokenizer issues by limiting to same process
176+
self.run()
177+
else:
178+
self.process = multiprocessing.Process(target=self.run, daemon=False, name=self.name)
179+
self.process.start()
123180

124181
async def _async_teardown(self):
125182
"""Shutdown the server."""
@@ -155,6 +212,9 @@ def register_microservice(
155212
provider: Optional[str] = None,
156213
provider_endpoint: Optional[str] = None,
157214
methods: List[str] = ["POST"],
215+
dynamic_batching: bool = False,
216+
dynamic_batching_timeout: int = 1,
217+
dynamic_batching_max_batch_size: int = 32,
158218
):
159219
def decorator(func):
160220
if name not in opea_microservices:
@@ -172,6 +232,9 @@ def decorator(func):
172232
output_datatype=output_datatype,
173233
provider=provider,
174234
provider_endpoint=provider_endpoint,
235+
dynamic_batching=dynamic_batching,
236+
dynamic_batching_timeout=dynamic_batching_timeout,
237+
dynamic_batching_max_batch_size=dynamic_batching_max_batch_size,
175238
)
176239
opea_microservices[name] = micro_service
177240
opea_microservices[name].app.router.add_api_route(endpoint, func, methods=methods)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright (C) 2024 Intel Corporation
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
# FROM opea/habanalabs:1.16.1-pytorch-installer-2.2.2 as hpu
5+
FROM vault.habana.ai/gaudi-docker/1.18.0/ubuntu22.04/habanalabs/pytorch-installer-2.4.0:latest as hpu
6+
7+
RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \
8+
libgl1-mesa-glx \
9+
libjemalloc-dev
10+
11+
RUN useradd -m -s /bin/bash user && \
12+
mkdir -p /home/user && \
13+
chown -R user /home/user/
14+
15+
# Disable user for now
16+
# USER user
17+
18+
COPY comps /home/user/comps
19+
20+
RUN pip install --no-cache-dir --upgrade pip && \
21+
pip install --no-cache-dir -r /home/user/comps/embeddings/tei/langchain/requirements.txt && \
22+
pip install git+https://github.com/huggingface/optimum-habana.git
23+
24+
ENV PYTHONPATH=$PYTHONPATH:/home/user
25+
26+
WORKDIR /home/user/comps/embeddings/tei/langchain
27+
28+
ENTRYPOINT ["python", "local_embedding_reranking.py"]

0 commit comments

Comments
 (0)