In [8]:
Copied!
from concurrent import futures
import httpx
urls = [f'https://httpbin.org/delay/{d}' for d in (0.6, 0.3, 0.0)]
def fetch_all(urls):
with httpx.Client() as client, futures.ThreadPoolExecutor() as executor:
fs = [executor.submit(client.get, url) for url in urls]
for future in futures.as_completed(fs):
yield future.result()
for resp in fetch_all(urls):
print(resp.url)
from concurrent import futures
import httpx
urls = [f'https://httpbin.org/delay/{d}' for d in (0.6, 0.3, 0.0)]
def fetch_all(urls):
with httpx.Client() as client, futures.ThreadPoolExecutor() as executor:
fs = [executor.submit(client.get, url) for url in urls]
for future in futures.as_completed(fs):
yield future.result()
for resp in fetch_all(urls):
print(resp.url)
https://httpbin.org/delay/0.0 https://httpbin.org/delay/0.3 https://httpbin.org/delay/0.6
futured.threaded abstracts away the boilerplate.
In [9]:
Copied!
from futured import threaded
fetch = threaded(httpx.Client().get)
for resp in fetch.map(urls, as_completed=True):
print(resp.url)
from futured import threaded
fetch = threaded(httpx.Client().get)
for resp in fetch.map(urls, as_completed=True):
print(resp.url)
https://httpbin.org/delay/0.0 https://httpbin.org/delay/0.3 https://httpbin.org/delay/0.6
Asynced¶
In [10]:
Copied!
import asyncio
import httpx
async def fetch_all(urls):
async with httpx.AsyncClient() as client:
for future in asyncio.as_completed(map(client.get, urls)):
yield await future
for resp in fetch_all(urls):
print(resp.url)
import asyncio
import httpx
async def fetch_all(urls):
async with httpx.AsyncClient() as client:
for future in asyncio.as_completed(map(client.get, urls)):
yield await future
for resp in fetch_all(urls):
print(resp.url)
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In[10], line 11 7 for future in asyncio.as_completed(map(client.get, urls)): 8 yield await future ---> 11 for resp in fetch_all(urls): 12 print(resp.url) TypeError: 'async_generator' object is not iterable
The problem is coroutines support the yield keyword, but only to create async iterators. Even though asyncio.as_completed is itself a normal iterator, there is no way to write this generator as intended. Additionally there is no iterator equivalent of loop.run_until_complete, to mitigate the viral nature of the async keyword.
So futured.asynced provides one.
In [ ]:
Copied!
from futured import asynced
for resp in asynced.run(fetch_all, urls):
print(resp.url)
from futured import asynced
for resp in asynced.run(fetch_all, urls):
print(resp.url)
The alternative approach is to explicitly handle the loop in the implementation.
In [ ]:
Copied!
def fetch_all(urls):
loop = asyncio.new_event_loop()
client = httpx.AsyncClient()
pending = [loop.create_task(client.get(url)) for url in urls]
while pending:
done, pending = loop.run_until_complete(
asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
)
for future in done:
yield future.result()
loop.run_until_complete(client.aclose())
for resp in fetch_all(urls):
print(resp.url)
def fetch_all(urls):
loop = asyncio.new_event_loop()
client = httpx.AsyncClient()
pending = [loop.create_task(client.get(url)) for url in urls]
while pending:
done, pending = loop.run_until_complete(
asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
)
for future in done:
yield future.result()
loop.run_until_complete(client.aclose())
for resp in fetch_all(urls):
print(resp.url)
For this case, asynced provides the same abstraction as threaded.
In [ ]:
Copied!
fetch = asynced(httpx.AsyncClient().get)
for resp in fetch.map(urls, as_completed=True):
print(resp.url)
fetch = asynced(httpx.AsyncClient().get)
for resp in fetch.map(urls, as_completed=True):
print(resp.url)