Links
AI / Agents
Developers
Meta
Requires: Python >=3.11
Futured provides a consistent interface for concurrent functional programming in Python. It wraps any callable to return a concurrent.futures.Future, wraps any async coroutine with a compatible Task interface, and provides concurrent iterators and context managers for futures.
Usage
threaded, processed
Transform any callable into one which runs in a thread or process pool, and returns a future.
import httpx2 as httpx
from futured import threaded, processed
fetch = threaded(httpx.Client().get)
fetch(url) # return Future
fs = (fetch(url + path) for path in paths)
threaded.results(fs) # generate results from futures
threaded.results(fs, timeout=...) # generate results as completed
fetch.map(urls) # generate results in order
fetch.map(urls, timeout=...) # generate results as completed
fetch.mapzip(urls) # generate (url, result) pairs as completedThread and process pool executors may be used as context managers, customized with options, and reused with different callables.
threaded(max_workers=...)(func, ...)
processed(max_workers=...)(func, ...)futured classes provide a tasks interface which generalizes futures.as_completed and futures.wait, while allowing the set of tasks to be modified, e.g., for retries.
tasks = threaded.tasks(fs, timeout=...) # mutable set of running tasks which pop as completed
with tasks: # wait for all tasks on exitasynced
The same interface works for asyncio.
from futured import asynced
fetch = asynced(httpx.AsyncClient().get)
fetch(url) # return coroutine
asynced.results(fs) # generate results from futures
asynced.results(fs, timeout=...) # generate results as completed
fetch.map(urls) # generate results in order
fetch.map(urls, timeout=...) # generate results as completed
fetch.mapzip(urls) # generate (url, result) pairs as completedasynced provides utilities for calling coroutines from a synchronous context. tasks is similar to trio’s nursery, but in a synchronous with block.
asynced.run(async_func, ...) # call and run until complete
asynced.run(async_gen, ...) # call and run synchronous iterator
tasks = asynced.tasks(fs, timeout=...) # mutable set of running tasks which pop as completed
with tasks: # wait for all tasks on exitextensions
There is also support for dask distributed clients and gevent greenlets.
from futured import distributed, greeneddecorators
Naturally futured wrappers can be used as decorators, but arguments can also be partially bound.
@threaded
def slow():
...
fetch = threaded(httpx.Client().get, url)
fetch(params=...)Methods are supported, as well as a decorated utility for automatically subclassing.
from futured import decorated
FutureClient = decorated(httpx.Client, request=threaded)
# equivalent to
class FutureClient(httpx.Client):
request = threaded(httpx.Client.request)command
command wraps subprocess.Popen to provide a Future compatible interface.
from futured import futured, command
command('ls').result() # return stdout or raises stderr
command('ls').pipe('wc') # pipes into next command, or | ('wc',... )
for line in command('ls'): # iterable lines
command.coroutine('ls') # return coroutine
futured(command, 'ls') # supports `map` interface
asynced(command.coroutine, 'ls') # supports `map` interface with timeoutforked
forked allows iteration in separate child processes.
from futured import forked
for value in forked(values, max_workers=...):
# in a child process
# in parent after children have exitedInstallation
pip install futured
Tests
100% branch coverage.
pytest [--cov]
