1
1
from contextlib import suppress
2
2
from datetime import datetime
3
3
from email .utils import parsedate_to_datetime
4
- from typing import Any , Awaitable , Callable , Dict , Optional , Set
4
+ from typing import (
5
+ Any ,
6
+ Awaitable ,
7
+ Callable ,
8
+ Dict ,
9
+ Optional ,
10
+ Set ,
11
+ Literal ,
12
+ AsyncGenerator ,
13
+ )
5
14
6
15
import httpx
7
16
import tenacity
@@ -17,14 +26,14 @@ def __init__(
17
26
self ,
18
27
* ,
19
28
configuration : Configuration ,
20
- request_type : Optional [str ] = None ,
29
+ method : Optional [str ] = None ,
21
30
url : Optional [str ] = None ,
22
31
body : Optional [Dict ] = None ,
23
32
** httpx_async_client_kwargs ,
24
33
):
25
34
self .configuration = configuration
26
35
self ._client = httpx .AsyncClient (** httpx_async_client_kwargs )
27
- self ._callback = getattr (self ._client , request_type ) if request_type else None
36
+ self ._callback = getattr (self ._client , method ) if method else None
28
37
self ._url = url
29
38
self ._body = body
30
39
if self ._callback is not None :
@@ -77,6 +86,30 @@ async def _():
77
86
78
87
return await _ ()
79
88
89
+ async def _stream (
90
+ self , method : Literal ["GET" ], url : str , * args , ** kwargs
91
+ ) -> AsyncGenerator [bytes , None ]:
92
+ n_attempts = self .configuration .retries .total
93
+ assert isinstance (n_attempts , int )
94
+
95
+ @tenacity .retry (
96
+ reraise = True ,
97
+ wait = self ._wait_callback ,
98
+ stop = tenacity .stop_after_attempt (n_attempts ),
99
+ retry = tenacity .retry_if_exception_type (httpx .HTTPStatusError ),
100
+ )
101
+ async def _ () -> AsyncGenerator [bytes , None ]:
102
+ async with self ._client .stream (
103
+ method = method , url = url , * args , ** kwargs
104
+ ) as response :
105
+ if response .status_code in self .configuration .retries .status_forcelist :
106
+ response .raise_for_status ()
107
+ async for chunk in response .aiter_bytes ():
108
+ yield chunk
109
+
110
+ async for chunk in _ ():
111
+ yield chunk
112
+
80
113
async def put (self , * args , ** kwargs ) -> httpx .Response :
81
114
return await self ._request (self ._client .put , * args , ** kwargs )
82
115
@@ -92,6 +125,12 @@ async def patch(self, *args, **kwargs) -> httpx.Response:
92
125
async def get (self , * args , ** kwargs ) -> httpx .Response :
93
126
return await self ._request (self ._client .get , * args , ** kwargs )
94
127
128
+ async def stream (
129
+ self , method : Literal ["GET" ], url : str , * args , ** kwargs
130
+ ) -> AsyncGenerator [bytes , None ]:
131
+ async for chunk in self ._stream (method = method , url = url , * args , ** kwargs ):
132
+ yield chunk
133
+
95
134
def _wait_callback (self , retry_state : tenacity .RetryCallState ) -> int :
96
135
assert retry_state .outcome is not None
97
136
if retry_state .outcome and retry_state .outcome .exception ():
0 commit comments