In [1]:
Copied!
import nest_asyncio
nest_asyncio.apply() # only needed for notebook
import nest_asyncio
nest_asyncio.apply() # only needed for notebook
In [2]:
Copied!
from concurrent import futures
import httpx
urls = [f'http://httpbin.org/delay/{d}' for d in (0.2, 0.1, 0.0)]
def fetch_all(urls):
with httpx.Client() as client, futures.ThreadPoolExecutor() as executor:
fs = [executor.submit(client.get, url) for url in urls]
for future in futures.as_completed(fs):
yield future.result()
for resp in fetch_all(urls):
print(resp.url)
from concurrent import futures
import httpx
urls = [f'http://httpbin.org/delay/{d}' for d in (0.2, 0.1, 0.0)]
def fetch_all(urls):
with httpx.Client() as client, futures.ThreadPoolExecutor() as executor:
fs = [executor.submit(client.get, url) for url in urls]
for future in futures.as_completed(fs):
yield future.result()
for resp in fetch_all(urls):
print(resp.url)
http://httpbin.org/delay/0.0 http://httpbin.org/delay/0.2
http://httpbin.org/delay/0.1
futured.threaded
abstracts away the boilerplate.
In [3]:
Copied!
from futured import threaded
fetch = threaded(httpx.Client().get)
for resp in fetch.map(urls, as_completed=True):
print(resp.url)
from futured import threaded
fetch = threaded(httpx.Client().get)
for resp in fetch.map(urls, as_completed=True):
print(resp.url)
http://httpbin.org/delay/0.0 http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2
Asynced¶
In [4]:
Copied!
import asyncio
import httpx
async def fetch_all(urls):
async with httpx.AsyncClient() as client:
for future in asyncio.as_completed(map(client.get, urls)):
yield await future
for resp in fetch_all(urls):
print(resp.url)
import asyncio
import httpx
async def fetch_all(urls):
async with httpx.AsyncClient() as client:
for future in asyncio.as_completed(map(client.get, urls)):
yield await future
for resp in fetch_all(urls):
print(resp.url)
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In[4], line 11 7 for future in asyncio.as_completed(map(client.get, urls)): 8 yield await future ---> 11 for resp in fetch_all(urls): 12 print(resp.url) TypeError: 'async_generator' object is not iterable
The problem is coroutines support the yield
keyword, but only to create async iterators. Even though asyncio.as_completed
is itself a normal iterator, there is no way to write this generator as intended. Additionally there is no iterator equivalent of loop.run_until_complete
, to mitigate the viral nature of the async
keyword.
So futured.asynced
provides one.
In [5]:
Copied!
from futured import asynced
for resp in asynced.run(fetch_all, urls):
print(resp.url)
from futured import asynced
for resp in asynced.run(fetch_all, urls):
print(resp.url)
http://httpbin.org/delay/0.0 http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2
The alternative approach is to explicitly handle the loop in the implementation.
In [6]:
Copied!
def fetch_all(urls):
loop = asyncio.new_event_loop()
client = httpx.AsyncClient()
pending = [loop.create_task(client.get(url)) for url in urls]
while pending:
done, pending = loop.run_until_complete(
asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
)
for future in done:
yield future.result()
loop.run_until_complete(client.aclose())
for resp in fetch_all(urls):
print(resp.url)
def fetch_all(urls):
loop = asyncio.new_event_loop()
client = httpx.AsyncClient()
pending = [loop.create_task(client.get(url)) for url in urls]
while pending:
done, pending = loop.run_until_complete(
asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
)
for future in done:
yield future.result()
loop.run_until_complete(client.aclose())
for resp in fetch_all(urls):
print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2
For this case, asynced
provides the same abstraction as threaded
.
In [7]:
Copied!
fetch = asynced(httpx.AsyncClient().get)
for resp in fetch.map(urls, as_completed=True):
print(resp.url)
fetch = asynced(httpx.AsyncClient().get)
for resp in fetch.map(urls, as_completed=True):
print(resp.url)
http://httpbin.org/delay/0.1 http://httpbin.org/delay/0.0 http://httpbin.org/delay/0.2