Examples

Fetching urls concurrently, and processing the responses as completed, is used as an example. This simple task is nonetheless surprisingly tedious, especially using asyncio.

Threaded

from concurrent import futures

import httpx2 as httpx

urls = [f"https://httpbin.org/delay/{d}" for d in (0.6, 0.3, 0.0)]


def fetch_all(urls):
    with httpx.Client() as client, futures.ThreadPoolExecutor() as executor:
        fs = [executor.submit(client.get, url) for url in urls]
        for future in futures.as_completed(fs):
            yield future.result()


for resp in fetch_all(urls):
    print(resp.url)
https://httpbin.org/delay/0.0
https://httpbin.org/delay/0.3
https://httpbin.org/delay/0.6

futured.threaded abstracts away the boilerplate.

from futured import threaded

fetch = threaded(httpx.Client().get)
for resp in fetch.map(urls, as_completed=True):
    print(resp.url)
https://httpbin.org/delay/0.0
https://httpbin.org/delay/0.3
https://httpbin.org/delay/0.6

Asynced

import asyncio


async def fetch_all(urls):
    async with httpx.AsyncClient() as client:
        for future in asyncio.as_completed(map(client.get, urls)):
            yield await future


for resp in fetch_all(urls):
    print(resp.url)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[3], line 10
      6         for future in asyncio.as_completed(map(client.get, urls)):
      7             yield await future
      8 
      9 
---> 10 for resp in fetch_all(urls):
     11     print(resp.url)

TypeError: 'async_generator' object is not iterable

The problem is coroutines support the yield keyword, but only to create async iterators. Even though asyncio.as_completed is itself a normal iterator, there is no way to write this generator as intended.

Additionally there is no iterator equivalent of loop.run_until_complete, to mitigate the viral nature of the async keyword. So futured.asynced provides one.

from futured import asynced

for resp in asynced.run(fetch_all, urls):
    ...

The alternative approach is to explicitly handle the loop in the implementation.

def fetch_all(urls):
    loop = asyncio.new_event_loop()
    client = httpx.AsyncClient()
    pending = [loop.create_task(client.get(url)) for url in urls]
    while pending:
        done, pending = loop.run_until_complete(
            asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        )
        for future in done:
            yield future.result()
    loop.run_until_complete(client.aclose())

For this case, asynced provides the same abstraction as threaded.

fetch = asynced(httpx.AsyncClient().get)
for resp in fetch.map(urls, as_completed=True):
    ...