Skip to content

Commit 30bdc88

Browse files
author
liyang
committed
introduce lazy prefetch
1 parent 0dd2329 commit 30bdc88

File tree

6 files changed

+46
-24
lines changed

6 files changed

+46
-24
lines changed

megfile/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def set_log_level(level: T.Optional[T.Union[int, str]] = None):
8383
READER_MAX_BUFFER_SIZE = parse_quantity(
8484
os.getenv("MEGFILE_READER_MAX_BUFFER_SIZE") or 128 * 2**20
8585
)
86+
READER_LAZY_PREFETCH = parse_boolean(os.getenv("MEGFILE_READER_LAZY_PREFETCH"), False)
8687

8788
# Multi-upload in aws s3 has a maximum of 10,000 parts,
8889
# so the maximum supported file size is MEGFILE_WRITE_BLOCK_SIZE * 10,000,

megfile/lib/base_prefetch_reader.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,7 @@ def __init__(
8282

8383
self._offset = 0
8484
self._cached_buffer = None
85-
self._block_index = None # Current block index
8685
self._seek_history = []
87-
8886
self._seek_buffer(0)
8987

9088
_logger.debug("open file: %r, mode: %s" % (self.name, self.mode))
@@ -98,7 +96,9 @@ def _futures(self) -> "LRUCacheFutureManager":
9896
return self._process_local("futures", self._get_futures)
9997

10098
def _get_futures(self):
101-
return LRUCacheFutureManager()
99+
futures = LRUCacheFutureManager()
100+
futures.register(self.name)
101+
return futures
102102

103103
@property
104104
@abstractmethod
@@ -207,9 +207,8 @@ def _read(self, size: int) -> bytes:
207207
if size == 0 or self._offset >= self._content_size:
208208
return b""
209209

210-
data = self._fetch_response(start=self._offset, end=self._offset + size - 1)[
211-
"Body"
212-
].read()
210+
resp = self._fetch_response(start=self._offset, end=self._offset + size - 1)
211+
data = resp["Body"].read()
213212
self.seek(size, os.SEEK_CUR)
214213
return data
215214

@@ -369,12 +368,17 @@ def _close(self):
369368
class LRUCacheFutureManager(OrderedDict):
370369
def __init__(self):
371370
super().__init__()
371+
self._name = None
372+
373+
def register(self, name):
374+
self._name = name
372375

373376
def submit(self, executor, key, *args, **kwargs):
374377
if key in self:
375378
self.move_to_end(key, last=True)
376379
return
377380
self[key] = executor.submit(*args, **kwargs)
381+
_logger.debug("submit future: %r, key: %r" % (self._name, key))
378382

379383
@property
380384
def finished(self):
@@ -385,7 +389,12 @@ def result(self, key):
385389
return self[key].result()
386390

387391
def cleanup(self, block_capacity: int):
392+
keys = []
388393
while len(self) > block_capacity:
389-
_, future = self.popitem(last=False)
394+
key, future = self.popitem(last=False)
395+
keys.append(key)
390396
if not future.done():
391397
future.cancel()
398+
if keys:
399+
_logger.debug("cleanup futures: %r, keys: %s" % (self._name, keys))
400+
return keys

megfile/lib/s3_prefetch_reader.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from megfile.config import (
66
READER_BLOCK_SIZE,
7+
READER_LAZY_PREFETCH,
78
READER_MAX_BUFFER_SIZE,
89
S3_MAX_RETRY_TIMES,
910
)
@@ -62,7 +63,7 @@ def __init__(
6263
)
6364

6465
def _get_content_size(self):
65-
if self._block_capacity <= 0:
66+
if self._block_capacity <= 0 or READER_LAZY_PREFETCH:
6667
response = self._client.head_object(Bucket=self._bucket, Key=self._key)
6768
self._content_etag = response.get("ETag")
6869
return int(response["ContentLength"])

megfile/lib/s3_share_cache_reader.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,16 +101,21 @@ def __init__(self):
101101
super().__init__()
102102
self._references = Counter()
103103

104-
def register(self, key):
105-
self._references[key] += 1
106-
107-
def unregister(self, key):
108-
self._references[key] -= 1
109-
if self._references[key] == 0:
110-
self._references.pop(key)
111-
for key_tuple in list(self):
112-
if key_tuple[0] != key:
104+
def register(self, name):
105+
self._references[name] += 1
106+
_logger.debug("register reader: %r, count: %d" % (name, self._references[name]))
107+
108+
def unregister(self, name):
109+
self._references[name] -= 1
110+
_logger.debug(
111+
"unregister reader: %r, count: %d" % (name, self._references[name])
112+
)
113+
if self._references[name] == 0:
114+
self._references.pop(name)
115+
for key in list(self):
116+
if key[0] != name:
113117
continue
114-
future = self.pop(key_tuple)
118+
future = self.pop(key)
115119
if not future.done():
116120
future.cancel() # pragma: no cover
121+
_logger.debug("cleanup all futures of reader: %r" % name)

megfile/s3_path.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,7 @@ def s3_buffered_open(
10001000
)
10011001

10021002
if mode == "rb":
1003+
block_size = block_size or READER_BLOCK_SIZE
10031004
if share_cache_key is not None:
10041005
reader = S3ShareCacheReader(
10051006
bucket,
@@ -1008,7 +1009,7 @@ def s3_buffered_open(
10081009
s3_client=client,
10091010
max_retries=max_retries,
10101011
max_workers=max_workers,
1011-
block_size=block_size or READER_BLOCK_SIZE,
1012+
block_size=block_size,
10121013
block_forward=block_forward,
10131014
profile_name=s3_url._profile_name,
10141015
)
@@ -1023,13 +1024,14 @@ def s3_buffered_open(
10231024
max_workers=max_workers,
10241025
max_buffer_size=max_buffer_size,
10251026
block_forward=block_forward,
1026-
block_size=block_size or READER_BLOCK_SIZE,
1027+
block_size=block_size,
10271028
profile_name=s3_url._profile_name,
10281029
)
10291030
if buffered or _is_pickle(reader):
1030-
reader = io.BufferedReader(reader) # type: ignore
1031+
reader = io.BufferedReader(reader, buffer_size=block_size) # type: ignore
10311032
return reader
10321033

1034+
block_size = block_size or WRITER_BLOCK_SIZE
10331035
if limited_seekable:
10341036
if max_buffer_size is None:
10351037
max_buffer_size = WRITER_MAX_BUFFER_SIZE
@@ -1038,7 +1040,7 @@ def s3_buffered_open(
10381040
key,
10391041
s3_client=client,
10401042
max_workers=max_workers,
1041-
block_size=block_size or WRITER_BLOCK_SIZE,
1043+
block_size=block_size,
10421044
max_buffer_size=max_buffer_size,
10431045
profile_name=s3_url._profile_name,
10441046
)
@@ -1050,12 +1052,12 @@ def s3_buffered_open(
10501052
key,
10511053
s3_client=client,
10521054
max_workers=max_workers,
1053-
block_size=block_size or WRITER_BLOCK_SIZE,
1055+
block_size=block_size,
10541056
max_buffer_size=max_buffer_size,
10551057
profile_name=s3_url._profile_name,
10561058
)
10571059
if buffered or _is_pickle(writer):
1058-
writer = io.BufferedWriter(writer) # type: ignore
1060+
writer = io.BufferedWriter(writer, buffer_size=block_size) # type: ignore
10591061
return writer
10601062

10611063

megfile/utils/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from threading import RLock
1919
from typing import IO, Callable, Optional
2020

21+
from megfile.config import READER_LAZY_PREFETCH
2122
from megfile.utils.mutex import ProcessLocal, ThreadLocal
2223

2324

@@ -81,6 +82,9 @@ def is_writable(fileobj: IO) -> bool:
8182

8283
def _is_pickle(fileobj) -> bool:
8384
"""Test if File Object is pickle"""
85+
if READER_LAZY_PREFETCH:
86+
return False
87+
8488
if fileobj.name.endswith(".pkl") or fileobj.name.endswith(".pickle"):
8589
return True
8690

0 commit comments

Comments
 (0)