Skip to content

Commit bcc2567

Browse files
🐛 Ensure chunk file download (#206)
1 parent 80b81c5 commit bcc2567

File tree

10 files changed

+174
-84
lines changed

10 files changed

+174
-84
lines changed

clients/python/requirements/e2e-test.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ ipykernel
55
ipython
66
jinja2
77
matplotlib
8+
memory_profiler
89
packaging
910
pandas
1011
papermill

clients/python/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
"tenacity",
3131
"tqdm>=4.48.0",
3232
f"osparc_client=={VERSION}",
33+
"aiofiles",
3334
]
3435

3536
SETUP = dict(

clients/python/src/osparc/_api_files_api.py

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@
44
import json
55
import logging
66
import math
7-
import random
8-
import shutil
9-
import string
107
from pathlib import Path
118
from typing import Any, Iterator, List, Optional, Tuple, Union
129

@@ -28,6 +25,10 @@
2825
FileUploadData,
2926
UploadedPart,
3027
)
28+
from urllib.parse import urljoin
29+
import aiofiles
30+
from tempfile import NamedTemporaryFile
31+
import shutil
3132
from ._utils import (
3233
DEFAULT_TIMEOUT_SECONDS,
3334
PaginationGenerator,
@@ -65,25 +66,57 @@ def __getattr__(self, name: str) -> Any:
6566
return super().__getattribute__(name)
6667

6768
def download_file(
68-
self, file_id: str, *, destination_folder: Optional[Path] = None, **kwargs
69+
self,
70+
file_id: str,
71+
*,
72+
destination_folder: Optional[Path] = None,
73+
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
74+
**kwargs,
75+
) -> str:
76+
return asyncio.run(
77+
self.download_file_async(
78+
file_id=file_id,
79+
destination_folder=destination_folder,
80+
timeout_seconds=timeout_seconds,
81+
**kwargs,
82+
)
83+
)
84+
85+
async def download_file_async(
86+
self,
87+
file_id: str,
88+
*,
89+
destination_folder: Optional[Path] = None,
90+
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
91+
**kwargs,
6992
) -> str:
7093
if destination_folder is not None and not destination_folder.is_dir():
7194
raise RuntimeError(
7295
f"destination_folder: {destination_folder} must be a directory"
7396
)
74-
downloaded_file: Path = Path(super().download_file(file_id, **kwargs))
75-
if destination_folder is not None:
76-
dest_file: Path = destination_folder / downloaded_file.name
77-
while dest_file.is_file():
78-
new_name = (
79-
downloaded_file.stem
80-
+ "".join(random.choices(string.ascii_letters, k=8))
81-
+ downloaded_file.suffix
97+
async with aiofiles.tempfile.NamedTemporaryFile(
98+
mode="wb",
99+
delete=False,
100+
) as downloaded_file:
101+
async with AsyncHttpClient(
102+
configuration=self.api_client.configuration, timeout=timeout_seconds
103+
) as session:
104+
url = urljoin(
105+
self.api_client.configuration.host, f"/v0/files/{file_id}/content"
82106
)
83-
dest_file = destination_folder / new_name
84-
shutil.move(downloaded_file, dest_file)
85-
downloaded_file = dest_file
86-
return str(downloaded_file.resolve())
107+
async for response in await session.stream(
108+
"GET", url=url, auth=self._auth, follow_redirects=True
109+
):
110+
response.raise_for_status()
111+
async for chunk in response.aiter_bytes():
112+
await downloaded_file.write(chunk)
113+
dest_file = f"{downloaded_file.name}"
114+
if destination_folder is not None:
115+
dest_file = NamedTemporaryFile(dir=destination_folder, delete=False).name
116+
shutil.move(
117+
f"{downloaded_file.name}", dest_file
118+
) # aiofiles doesnt seem to have an async variant of this
119+
return dest_file
87120

88121
def upload_file(
89122
self,
@@ -105,7 +138,7 @@ async def upload_file_async(
105138
file = Path(file)
106139
if not file.is_file():
107140
raise RuntimeError(f"{file} is not a file")
108-
checksum: str = compute_sha256(file)
141+
checksum: str = await compute_sha256(file)
109142
for file_result in self._search_files(
110143
sha256_checksum=checksum, timeout_seconds=timeout_seconds
111144
):
@@ -159,7 +192,7 @@ async def upload_file_async(
159192
)
160193
async with AsyncHttpClient(
161194
configuration=self.api_client.configuration,
162-
request_type="post",
195+
method="post",
163196
url=links.abort_upload,
164197
body=abort_body.to_dict(),
165198
base_url=self.api_client.configuration.host,

clients/python/src/osparc/_http_client.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
11
from contextlib import suppress
22
from datetime import datetime
33
from email.utils import parsedate_to_datetime
4-
from typing import Any, Awaitable, Callable, Dict, Optional, Set
4+
from typing import (
5+
Any,
6+
Awaitable,
7+
Callable,
8+
Dict,
9+
Optional,
10+
Set,
11+
Literal,
12+
AsyncGenerator,
13+
)
514

615
import httpx
716
import tenacity
@@ -17,14 +26,14 @@ def __init__(
1726
self,
1827
*,
1928
configuration: Configuration,
20-
request_type: Optional[str] = None,
29+
method: Optional[str] = None,
2130
url: Optional[str] = None,
2231
body: Optional[Dict] = None,
2332
**httpx_async_client_kwargs,
2433
):
2534
self.configuration = configuration
2635
self._client = httpx.AsyncClient(**httpx_async_client_kwargs)
27-
self._callback = getattr(self._client, request_type) if request_type else None
36+
self._callback = getattr(self._client, method) if method else None
2837
self._url = url
2938
self._body = body
3039
if self._callback is not None:
@@ -77,6 +86,28 @@ async def _():
7786

7887
return await _()
7988

89+
async def _stream(
90+
self, method: Literal["GET"], url: str, *args, **kwargs
91+
) -> AsyncGenerator[httpx.Response, None]:
92+
n_attempts = self.configuration.retries.total
93+
assert isinstance(n_attempts, int)
94+
95+
@tenacity.retry(
96+
reraise=True,
97+
wait=self._wait_callback,
98+
stop=tenacity.stop_after_attempt(n_attempts),
99+
retry=tenacity.retry_if_exception_type(httpx.HTTPStatusError),
100+
)
101+
async def _() -> AsyncGenerator[httpx.Response, None]:
102+
async with self._client.stream(
103+
method=method, url=url, *args, **kwargs
104+
) as response:
105+
if response.status_code in self.configuration.retries.status_forcelist:
106+
response.raise_for_status()
107+
yield response
108+
109+
return _()
110+
80111
async def put(self, *args, **kwargs) -> httpx.Response:
81112
return await self._request(self._client.put, *args, **kwargs)
82113

@@ -92,6 +123,11 @@ async def patch(self, *args, **kwargs) -> httpx.Response:
92123
async def get(self, *args, **kwargs) -> httpx.Response:
93124
return await self._request(self._client.get, *args, **kwargs)
94125

126+
async def stream(
127+
self, method: Literal["GET"], url: str, *args, **kwargs
128+
) -> AsyncGenerator[httpx.Response, None]:
129+
return await self._stream(method=method, url=url, *args, **kwargs)
130+
95131
def _wait_callback(self, retry_state: tenacity.RetryCallState) -> int:
96132
assert retry_state.outcome is not None
97133
if retry_state.outcome and retry_state.outcome.exception():

clients/python/src/osparc/_utils.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
Solver,
1717
Study,
1818
)
19-
19+
import aiofiles
2020
from ._exceptions import RequestError
2121

2222
_KB = 1024 # in bytes
@@ -87,15 +87,15 @@ async def file_chunk_generator(
8787
bytes_read: int = 0
8888
file_size: int = file.stat().st_size
8989
while bytes_read < file_size:
90-
with open(file, "rb") as f:
91-
f.seek(bytes_read)
90+
async with aiofiles.open(file, "rb") as f:
91+
await f.seek(bytes_read)
9292
nbytes = (
9393
chunk_size
9494
if (bytes_read + chunk_size <= file_size)
9595
else (file_size - bytes_read)
9696
)
9797
assert nbytes > 0
98-
chunk = await asyncio.get_event_loop().run_in_executor(None, f.read, nbytes)
98+
chunk = await f.read(nbytes)
9999
yield chunk, nbytes
100100
bytes_read += nbytes
101101

@@ -109,16 +109,16 @@ async def _fcn_to_coro(callback: Callable[..., S], *args) -> S:
109109
return result
110110

111111

112-
def compute_sha256(file: Path) -> str:
112+
async def compute_sha256(file: Path) -> str:
113113
assert file.is_file()
114114
sha256 = hashlib.sha256()
115-
with open(file, "rb") as f:
115+
async with aiofiles.open(file, "rb") as f:
116116
while True:
117-
data = f.read(100 * _KB)
117+
data = await f.read(100 * _KB)
118118
if not data:
119119
break
120120
sha256.update(data)
121-
return sha256.hexdigest()
121+
return sha256.hexdigest()
122122

123123

124124
def dev_features_enabled() -> bool:

clients/python/test/e2e/_utils.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,6 @@ def repo_version() -> Version:
2424
return Version(version_file.read_text())
2525

2626

27-
def skip_if_no_dev_features(test):
28-
if (
29-
Version(osparc.__version__) < repo_version()
30-
or not osparc_dev_features_enabled()
31-
):
32-
return pytest.mark.skip(
33-
(
34-
f"{osparc.__version__=}<{str(repo_version)} "
35-
f"or {osparc_dev_features_enabled()=}"
36-
)
37-
)(test)
38-
return test
39-
40-
4127
def skip_if_osparc_version(
4228
*,
4329
at_least: Optional[Version] = None,

clients/python/test/e2e/conftest.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,14 @@
1717
from numpy import random
1818
from packaging.version import Version
1919
from pydantic import ByteSize
20+
from typing import Callable
2021

2122
try:
2223
from osparc._settings import ConfigurationEnvVars
2324
except ImportError:
2425
pass
2526

2627

27-
_KB: ByteSize = ByteSize(1024) # in bytes
28-
_MB: ByteSize = ByteSize(_KB * 1024) # in bytes
29-
_GB: ByteSize = ByteSize(_MB * 1024) # in bytes
30-
31-
3228
# Dictionary to store start times of tests
3329
_test_start_times = {}
3430

@@ -133,20 +129,24 @@ def async_client() -> Iterable[AsyncClient]:
133129

134130

135131
@pytest.fixture
136-
def tmp_file(tmp_path: Path, caplog: pytest.LogCaptureFixture) -> Path:
137-
caplog.set_level(logging.INFO)
138-
byte_size: ByteSize = 1 * _GB
139-
tmp_file = tmp_path / "large_test_file.txt"
140-
ss: random.SeedSequence = random.SeedSequence()
141-
logging.info("Entropy used to generate random file: %s", f"{ss.entropy}")
142-
rng: random.Generator = random.default_rng(ss)
143-
tmp_file.write_bytes(rng.bytes(1000))
144-
with open(tmp_file, "wb") as f:
145-
f.truncate(byte_size)
146-
assert (
147-
tmp_file.stat().st_size == byte_size
148-
), f"Could not create file of size: {byte_size}"
149-
return tmp_file
132+
def create_tmp_file(
133+
tmp_path: Path, caplog: pytest.LogCaptureFixture
134+
) -> Callable[[ByteSize], Path]:
135+
def _generate_file(file_size: ByteSize):
136+
caplog.set_level(logging.INFO)
137+
tmp_file = tmp_path / "large_test_file.txt"
138+
ss: random.SeedSequence = random.SeedSequence()
139+
logging.info("Entropy used to generate random file: %s", f"{ss.entropy}")
140+
rng: random.Generator = random.default_rng(ss)
141+
tmp_file.write_bytes(rng.bytes(1000))
142+
with open(tmp_file, "wb") as f:
143+
f.truncate(file_size)
144+
assert (
145+
tmp_file.stat().st_size == file_size
146+
), f"Could not create file of size: {file_size}"
147+
return tmp_file
148+
149+
return _generate_file
150150

151151

152152
@pytest.fixture

0 commit comments

Comments
 (0)