Skip to content

Commit 398ae76

Browse files
committed
ensure abort call is performed even if a single chunk upload fails
1 parent 1512952 commit 398ae76

File tree

1 file changed

+45
-41
lines changed

1 file changed

+45
-41
lines changed

clients/python/src/osparc/_api_files_api.py

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -168,52 +168,53 @@ async def upload_file_async(
168168
"Did not receive sufficient number of upload URLs from the server."
169169
)
170170

171+
abort_body = BodyAbortMultipartUploadV0FilesFileIdAbortPost(
172+
client_file=client_file
173+
)
171174
upload_tasks: Set[asyncio.Task] = set()
172175
uploaded_parts: List[UploadedPart] = []
176+
173177
async with AsyncHttpClient(
174-
configuration=self.api_client.configuration, timeout=timeout_seconds
175-
) as session:
176-
with logging_redirect_tqdm():
177-
_logger.debug("Uploading %s in %i chunk(s)", file.name, n_urls)
178-
async for chunck, size, is_final_chunk in tqdm(
179-
file_chunk_generator(file, chunk_size),
180-
total=n_urls,
181-
disable=(not _logger.isEnabledFor(logging.DEBUG)),
182-
):
183-
index, url = next(url_iter)
184-
upload_tasks.add(
185-
asyncio.create_task(
186-
self._upload_chunck(
187-
http_client=session,
188-
chunck=chunck,
189-
chunck_size=size,
190-
upload_link=url,
191-
index=index,
178+
configuration=self.api_client.configuration,
179+
method="post",
180+
url=links.abort_upload,
181+
body=abort_body.to_dict(),
182+
base_url=self.api_client.configuration.host,
183+
follow_redirects=True,
184+
auth=self._auth,
185+
timeout=timeout_seconds,
186+
) as api_server_session:
187+
async with AsyncHttpClient(
188+
configuration=self.api_client.configuration, timeout=timeout_seconds
189+
) as s3_session:
190+
with logging_redirect_tqdm():
191+
_logger.debug("Uploading %s in %i chunk(s)", file.name, n_urls)
192+
async for chunck, size, is_final_chunk in tqdm(
193+
file_chunk_generator(file, chunk_size),
194+
total=n_urls,
195+
disable=(not _logger.isEnabledFor(logging.DEBUG)),
196+
): # type: ignore
197+
index, url = next(url_iter)
198+
upload_tasks.add(
199+
asyncio.create_task(
200+
self._upload_chunck(
201+
http_client=s3_session,
202+
chunck=chunck,
203+
chunck_size=size,
204+
upload_link=url,
205+
index=index,
206+
)
192207
)
193208
)
194-
)
195-
while (len(upload_tasks) > max_concurrent_uploads) or (
196-
is_final_chunk and len(upload_tasks) > 0
197-
):
198-
done, upload_tasks = await asyncio.wait(
199-
upload_tasks, return_when=asyncio.FIRST_COMPLETED
200-
)
201-
for task in done:
202-
uploaded_parts.append(task.result())
209+
while (len(upload_tasks) > max_concurrent_uploads) or (
210+
is_final_chunk and len(upload_tasks) > 0
211+
):
212+
done, upload_tasks = await asyncio.wait(
213+
upload_tasks, return_when=asyncio.FIRST_COMPLETED
214+
)
215+
for task in done:
216+
uploaded_parts.append(task.result())
203217

204-
abort_body = BodyAbortMultipartUploadV0FilesFileIdAbortPost(
205-
client_file=client_file
206-
)
207-
async with AsyncHttpClient(
208-
configuration=self.api_client.configuration,
209-
method="post",
210-
url=links.abort_upload,
211-
body=abort_body.to_dict(),
212-
base_url=self.api_client.configuration.host,
213-
follow_redirects=True,
214-
auth=self._auth,
215-
timeout=timeout_seconds,
216-
) as session:
217218
_logger.debug(
218219
(
219220
"Completing upload of %s "
@@ -222,7 +223,10 @@ async def upload_file_async(
222223
file.name,
223224
)
224225
server_file: File = await self._complete_multipart_upload(
225-
session, links.complete_upload, client_file, uploaded_parts
226+
api_server_session,
227+
links.complete_upload,
228+
client_file,
229+
uploaded_parts,
226230
)
227231
_logger.debug("File upload complete: %s", file.name)
228232
return server_file

0 commit comments

Comments
 (0)