Skip to content

Reference

futured.futured

Bases: partial

A partial function which returns futures.

Source code in futured/__init__.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class futured(partial):
    """A partial function which returns futures."""

    as_completed: Callable = NotImplemented

    def __get__(self, instance, owner):
        return self if instance is None else types.MethodType(self, instance)

    @classmethod
    def results(cls, fs: Iterable, *, as_completed=False, **kwargs) -> Iterator:
        """Generate results concurrently from futures, by default in order.

        Args:
            fs: iterable of futures
            as_completed kwargs: generate results as completed with options, e.g., timeout
        """
        tasks = cls.as_completed(fs, **kwargs) if (as_completed or kwargs) else list(fs)
        return map(operator.methodcaller('result'), tasks)

    @classmethod
    def items(cls, pairs: Iterable, **kwargs) -> Iterator:
        """Generate key, result pairs as completed from futures.

        Args:
            pairs: key, future pairs
            **kwargs: as completed options, e.g., timeout
        """
        keys = dict(map(reversed, pairs))  # type: ignore
        return ((keys[future], future.result()) for future in cls.as_completed(keys, **kwargs))

    def map(self, *iterables: Iterable, **kwargs) -> Iterator:
        """Asynchronously map function.

        Args:
            **kwargs: keyword options for [results][futured.futured.results]
        """
        return self.results(map(self, *iterables), **kwargs)

    def starmap(self, iterable: Iterable, **kwargs) -> Iterator:
        """Asynchronously starmap function.

        Args:
            **kwargs: keyword options for [results][futured.futured.results]
        """
        return self.results(itertools.starmap(self, iterable), **kwargs)

    def mapzip(self, iterable: Iterable, **kwargs) -> Iterator:
        """Generate arg, result pairs as completed.

        Args:
            **kwargs: keyword options for [items][futured.futured.items]
        """
        return self.items(((arg, self(arg)) for arg in iterable), **kwargs)

    @classmethod
    @contextlib.contextmanager
    def waiting(cls, *fs, **kwargs):
        """Return context manager which waits on [results][futured.futured.results]."""
        fs = list(fs)
        try:
            yield fs
        finally:
            fs[:] = cls.results(fs, **kwargs)

    class tasks(set):
        """A set of futures which iterate as completed, and can be updated while iterating."""

        TimeoutError = futures.TimeoutError

        def __init__(self, fs: Iterable, *, timeout=None):
            super().__init__(fs)
            self.timeout = timeout
            self.it = self.iter()

        def wait(self, fs: list) -> Iterable:
            return futures.wait(fs, self.timeout, return_when='FIRST_COMPLETED').done

        def iter(self):
            while self:
                done = self.wait(list(super().__iter__()))
                if not done:
                    raise self.TimeoutError
                self.difference_update(done)
                yield from done

        def __iter__(self):
            return self

        def __next__(self):
            return next(self.it)

tasks

Bases: set

A set of futures which iterate as completed, and can be updated while iterating.

Source code in futured/__init__.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class tasks(set):
    """A set of futures which iterate as completed, and can be updated while iterating."""

    TimeoutError = futures.TimeoutError

    def __init__(self, fs: Iterable, *, timeout=None):
        super().__init__(fs)
        self.timeout = timeout
        self.it = self.iter()

    def wait(self, fs: list) -> Iterable:
        return futures.wait(fs, self.timeout, return_when='FIRST_COMPLETED').done

    def iter(self):
        while self:
            done = self.wait(list(super().__iter__()))
            if not done:
                raise self.TimeoutError
            self.difference_update(done)
            yield from done

    def __iter__(self):
        return self

    def __next__(self):
        return next(self.it)

items(pairs, **kwargs) classmethod

Generate key, result pairs as completed from futures.

Parameters:

Name Type Description Default
pairs Iterable

key, future pairs

required
**kwargs

as completed options, e.g., timeout

{}
Source code in futured/__init__.py
33
34
35
36
37
38
39
40
41
42
@classmethod
def items(cls, pairs: Iterable, **kwargs) -> Iterator:
    """Generate key, result pairs as completed from futures.

    Args:
        pairs: key, future pairs
        **kwargs: as completed options, e.g., timeout
    """
    keys = dict(map(reversed, pairs))  # type: ignore
    return ((keys[future], future.result()) for future in cls.as_completed(keys, **kwargs))

map(*iterables, **kwargs)

Asynchronously map function.

Parameters:

Name Type Description Default
**kwargs

keyword options for results

{}
Source code in futured/__init__.py
44
45
46
47
48
49
50
def map(self, *iterables: Iterable, **kwargs) -> Iterator:
    """Asynchronously map function.

    Args:
        **kwargs: keyword options for [results][futured.futured.results]
    """
    return self.results(map(self, *iterables), **kwargs)

mapzip(iterable, **kwargs)

Generate arg, result pairs as completed.

Parameters:

Name Type Description Default
**kwargs

keyword options for items

{}
Source code in futured/__init__.py
60
61
62
63
64
65
66
def mapzip(self, iterable: Iterable, **kwargs) -> Iterator:
    """Generate arg, result pairs as completed.

    Args:
        **kwargs: keyword options for [items][futured.futured.items]
    """
    return self.items(((arg, self(arg)) for arg in iterable), **kwargs)

results(fs, *, as_completed=False, **kwargs) classmethod

Generate results concurrently from futures, by default in order.

Parameters:

Name Type Description Default
fs Iterable

iterable of futures

required
as_completed kwargs

generate results as completed with options, e.g., timeout

False
Source code in futured/__init__.py
22
23
24
25
26
27
28
29
30
31
@classmethod
def results(cls, fs: Iterable, *, as_completed=False, **kwargs) -> Iterator:
    """Generate results concurrently from futures, by default in order.

    Args:
        fs: iterable of futures
        as_completed kwargs: generate results as completed with options, e.g., timeout
    """
    tasks = cls.as_completed(fs, **kwargs) if (as_completed or kwargs) else list(fs)
    return map(operator.methodcaller('result'), tasks)

starmap(iterable, **kwargs)

Asynchronously starmap function.

Parameters:

Name Type Description Default
**kwargs

keyword options for results

{}
Source code in futured/__init__.py
52
53
54
55
56
57
58
def starmap(self, iterable: Iterable, **kwargs) -> Iterator:
    """Asynchronously starmap function.

    Args:
        **kwargs: keyword options for [results][futured.futured.results]
    """
    return self.results(itertools.starmap(self, iterable), **kwargs)

waiting(*fs, **kwargs) classmethod

Return context manager which waits on results.

Source code in futured/__init__.py
68
69
70
71
72
73
74
75
76
@classmethod
@contextlib.contextmanager
def waiting(cls, *fs, **kwargs):
    """Return context manager which waits on [results][futured.futured.results]."""
    fs = list(fs)
    try:
        yield fs
    finally:
        fs[:] = cls.results(fs, **kwargs)

futured.threaded

Bases: executed

A partial function executed in its own thread pool.

Source code in futured/__init__.py
124
125
126
127
class threaded(executed):
    """A partial function executed in its own thread pool."""

    Executor = futures.ThreadPoolExecutor

futured.processed

Bases: executed

A partial function executed in its own process pool.

Source code in futured/__init__.py
130
131
132
133
class processed(executed):
    """A partial function executed in its own process pool."""

    Executor = futures.ProcessPoolExecutor

futured.asynced

Bases: futured

A partial async coroutine.

Source code in futured/__init__.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class asynced(futured):
    """A partial async coroutine."""

    @classmethod
    def results(cls, fs: Iterable, *, as_completed=False, **kwargs) -> Iterator:
        if as_completed or kwargs:
            return map(operator.methodcaller('result'), cls.tasks(fs, **kwargs))
        loop = asyncio.new_event_loop()
        tasks = list(map(loop.create_task, fs))
        return map(loop.run_until_complete, tasks)

    @staticmethod
    async def pair(key, future):
        return key, await future

    @classmethod
    def items(cls, pairs: Iterable, **kwargs) -> Iterator:
        return cls.results(itertools.starmap(cls.pair, pairs), as_completed=True, **kwargs)

    def run(self: Callable, *args, **kwargs):
        """Synchronously call and run coroutine or asynchronous iterator."""
        coro = self(*args, **kwargs)
        return asynced.iter(coro) if isinstance(coro, AsyncIterable) else asyncio.run(coro)

    @staticmethod
    def iter(aiterable: AsyncIterable, loop=None):
        """Wrap an asynchronous iterable into an iterator.

        Analogous to `asyncio.run` for coroutines.
        """
        loop = loop or asyncio.new_event_loop()
        anext = aiterable.__aiter__().__anext__
        task = loop.create_task(anext())
        while True:
            try:
                result = loop.run_until_complete(task)
            except StopAsyncIteration:
                return
            task = loop.create_task(anext())
            yield result

    class tasks(futured.tasks):
        __doc__ = futured.tasks.__doc__
        TimeoutError = asyncio.TimeoutError  # type: ignore

        def __init__(self, coros: Iterable, **kwargs):
            self.loop = asyncio.new_event_loop()
            super().__init__(map(self.loop.create_task, coros), **kwargs)

        def add(self, coro):
            super().add(self.loop.create_task(coro))

        def wait(self, fs: list) -> Iterable:
            coro = asyncio.wait(fs, timeout=self.timeout, return_when='FIRST_COMPLETED')
            return self.loop.run_until_complete(coro)[0]

iter(aiterable, loop=None) staticmethod

Wrap an asynchronous iterable into an iterator.

Analogous to asyncio.run for coroutines.

Source code in futured/__init__.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@staticmethod
def iter(aiterable: AsyncIterable, loop=None):
    """Wrap an asynchronous iterable into an iterator.

    Analogous to `asyncio.run` for coroutines.
    """
    loop = loop or asyncio.new_event_loop()
    anext = aiterable.__aiter__().__anext__
    task = loop.create_task(anext())
    while True:
        try:
            result = loop.run_until_complete(task)
        except StopAsyncIteration:
            return
        task = loop.create_task(anext())
        yield result

run(*args, **kwargs)

Synchronously call and run coroutine or asynchronous iterator.

Source code in futured/__init__.py
163
164
165
166
def run(self: Callable, *args, **kwargs):
    """Synchronously call and run coroutine or asynchronous iterator."""
    coro = self(*args, **kwargs)
    return asynced.iter(coro) if isinstance(coro, AsyncIterable) else asyncio.run(coro)

futured.decorated(base, **decorators)

Return subclass with decorated methods.

Source code in futured/__init__.py
289
290
291
292
def decorated(base: type, **decorators: Callable) -> type:
    """Return subclass with decorated methods."""
    namespace = {name: decorators[name](getattr(base, name)) for name in decorators}
    return type(base.__name__, (base,), namespace)

futured.command

Bases: Popen

Asynchronous subprocess with a future compatible interface.

Source code in futured/__init__.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
class command(subprocess.Popen):
    """Asynchronous subprocess with a future compatible interface."""

    def __init__(self, *args, **kwargs):
        super().__init__(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

    def check(self, args, stdout, stderr):
        if self.returncode:
            raise subprocess.CalledProcessError(self.returncode, args, stdout, stderr)
        return stdout

    @classmethod
    async def coroutine(cls, *args, shell=False, **kwargs):
        """Create a subprocess coroutine, suitable for timeouts."""
        create = asyncio.create_subprocess_shell if shell else asyncio.create_subprocess_exec
        self = await create(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
        return cls.check(self, args, *(await self.communicate()))

    def result(self, **kwargs) -> Union[str, bytes]:
        """Return stdout or raise stderr."""
        return self.check(self.args, *self.communicate(**kwargs))

    def pipe(self, *args, **kwargs) -> 'command':
        """Pipe stdout to the next command's stdin."""
        return type(self)(*args, stdin=self.stdout, **kwargs)

    def __or__(self, other: Iterable) -> 'command':
        """Alias of [pipe][futured.command.pipe]."""
        return self.pipe(*other)

    def __iter__(self):
        """Return output lines."""
        return iter(self.result().splitlines())

__iter__()

Return output lines.

Source code in futured/__init__.py
260
261
262
def __iter__(self):
    """Return output lines."""
    return iter(self.result().splitlines())

__or__(other)

Alias of pipe.

Source code in futured/__init__.py
256
257
258
def __or__(self, other: Iterable) -> 'command':
    """Alias of [pipe][futured.command.pipe]."""
    return self.pipe(*other)

coroutine(*args, shell=False, **kwargs) async classmethod

Create a subprocess coroutine, suitable for timeouts.

Source code in futured/__init__.py
241
242
243
244
245
246
@classmethod
async def coroutine(cls, *args, shell=False, **kwargs):
    """Create a subprocess coroutine, suitable for timeouts."""
    create = asyncio.create_subprocess_shell if shell else asyncio.create_subprocess_exec
    self = await create(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
    return cls.check(self, args, *(await self.communicate()))

pipe(*args, **kwargs)

Pipe stdout to the next command's stdin.

Source code in futured/__init__.py
252
253
254
def pipe(self, *args, **kwargs) -> 'command':
    """Pipe stdout to the next command's stdin."""
    return type(self)(*args, stdin=self.stdout, **kwargs)

result(**kwargs)

Return stdout or raise stderr.

Source code in futured/__init__.py
248
249
250
def result(self, **kwargs) -> Union[str, bytes]:
    """Return stdout or raise stderr."""
    return self.check(self.args, *self.communicate(**kwargs))

futured.forked(values, max_workers=0)

Generate each value in its own child process and wait in the parent.

Source code in futured/__init__.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def forked(values: Iterable, max_workers: int = 0) -> Iterator:
    """Generate each value in its own child process and wait in the parent."""
    max_workers = max_workers or os.cpu_count() or 1  # same default as ProcessPoolExecutor
    workers: dict = {}

    def wait():
        pid, status = os.wait()
        if pid in workers:
            value = workers.pop(pid)
            if status:
                raise OSError(status, value)

    for value in values:
        while len(workers) >= max_workers:
            wait()
        if pid := os.fork():
            workers[pid] = value
        else:  # pragma: no cover
            yield value
            os._exit(0)
    while workers:
        wait()