Skip to content

Reference

futured.futured

Bases: partial

A partial function which returns futures.

Source code in futured/__init__.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class futured(partial):
    """A partial function which returns futures."""

    as_completed: Callable = NotImplemented

    def __get__(self, instance, owner):
        return self if instance is None else types.MethodType(self, instance)

    @classmethod
    def results(cls, fs: Iterable, *, as_completed=False, **kwargs) -> Iterator:
        """Generate results concurrently from futures, by default in order.

        Args:
            fs: iterable of futures
            as_completed kwargs: generate results as completed with options, e.g., timeout
        """
        tasks = cls.as_completed(fs, **kwargs) if (as_completed or kwargs) else list(fs)
        return map(operator.methodcaller('result'), tasks)

    @classmethod
    def items(cls, pairs: Iterable, **kwargs) -> Iterator:
        """Generate key, result pairs as completed from futures.

        Args:
            pairs: key, future pairs
            **kwargs: as completed options, e.g., timeout
        """
        keys = dict(map(reversed, pairs))  # type: ignore
        return ((keys[future], future.result()) for future in cls.as_completed(keys, **kwargs))

    def map(self, *iterables: Iterable, **kwargs) -> Iterator:
        """Asynchronously map function.

        Args:
            **kwargs: keyword options for [results][futured.futured.results]
        """
        return self.results(map(self, *iterables), **kwargs)

    def starmap(self, iterable: Iterable, **kwargs) -> Iterator:
        """Asynchronously starmap function.

        Args:
            **kwargs: keyword options for [results][futured.futured.results]
        """
        return self.results(itertools.starmap(self, iterable), **kwargs)

    def mapzip(self, iterable: Iterable, **kwargs) -> Iterator:
        """Generate arg, result pairs as completed.

        Args:
            **kwargs: keyword options for [items][futured.futured.items]
        """
        return self.items(((arg, self(arg)) for arg in iterable), **kwargs)

    @classmethod
    @contextlib.contextmanager
    def waiting(cls, *fs, **kwargs):
        """Return context manager which waits on [results][futured.futured.results]."""
        fs = list(fs)
        try:
            yield fs
        finally:
            fs[:] = cls.results(fs, **kwargs)

    class tasks(set):
        """A set of futures which iterate as completed, and can be updated while iterating."""

        wait = staticmethod(futures.wait)
        TimeoutError = futures.TimeoutError

        def __init__(self, fs: Iterable, *, timeout=None):
            super().__init__(fs)
            self.options = dict(return_when='FIRST_COMPLETED', timeout=timeout)
            self.it = self.iter()

        def iter(self):
            while self:
                done, _ = self.wait(list(super().__iter__()), **self.options)
                if not done:
                    raise self.TimeoutError
                self -= done
                yield from done

        def __iter__(self):
            return self

        def __next__(self):
            return next(self.it)

tasks

Bases: set

A set of futures which iterate as completed, and can be updated while iterating.

Source code in futured/__init__.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class tasks(set):
    """A set of futures which iterate as completed, and can be updated while iterating."""

    wait = staticmethod(futures.wait)
    TimeoutError = futures.TimeoutError

    def __init__(self, fs: Iterable, *, timeout=None):
        super().__init__(fs)
        self.options = dict(return_when='FIRST_COMPLETED', timeout=timeout)
        self.it = self.iter()

    def iter(self):
        while self:
            done, _ = self.wait(list(super().__iter__()), **self.options)
            if not done:
                raise self.TimeoutError
            self -= done
            yield from done

    def __iter__(self):
        return self

    def __next__(self):
        return next(self.it)

items(pairs, **kwargs) classmethod

Generate key, result pairs as completed from futures.

Parameters:

Name Type Description Default
pairs Iterable

key, future pairs

required
**kwargs

as completed options, e.g., timeout

{}
Source code in futured/__init__.py
34
35
36
37
38
39
40
41
42
43
@classmethod
def items(cls, pairs: Iterable, **kwargs) -> Iterator:
    """Generate key, result pairs as completed from futures.

    Args:
        pairs: key, future pairs
        **kwargs: as completed options, e.g., timeout
    """
    keys = dict(map(reversed, pairs))  # type: ignore
    return ((keys[future], future.result()) for future in cls.as_completed(keys, **kwargs))

map(*iterables, **kwargs)

Asynchronously map function.

Parameters:

Name Type Description Default
**kwargs

keyword options for results

{}
Source code in futured/__init__.py
45
46
47
48
49
50
51
def map(self, *iterables: Iterable, **kwargs) -> Iterator:
    """Asynchronously map function.

    Args:
        **kwargs: keyword options for [results][futured.futured.results]
    """
    return self.results(map(self, *iterables), **kwargs)

mapzip(iterable, **kwargs)

Generate arg, result pairs as completed.

Parameters:

Name Type Description Default
**kwargs

keyword options for items

{}
Source code in futured/__init__.py
61
62
63
64
65
66
67
def mapzip(self, iterable: Iterable, **kwargs) -> Iterator:
    """Generate arg, result pairs as completed.

    Args:
        **kwargs: keyword options for [items][futured.futured.items]
    """
    return self.items(((arg, self(arg)) for arg in iterable), **kwargs)

results(fs, *, as_completed=False, **kwargs) classmethod

Generate results concurrently from futures, by default in order.

Parameters:

Name Type Description Default
fs Iterable

iterable of futures

required
as_completed kwargs

generate results as completed with options, e.g., timeout

False
Source code in futured/__init__.py
23
24
25
26
27
28
29
30
31
32
@classmethod
def results(cls, fs: Iterable, *, as_completed=False, **kwargs) -> Iterator:
    """Generate results concurrently from futures, by default in order.

    Args:
        fs: iterable of futures
        as_completed kwargs: generate results as completed with options, e.g., timeout
    """
    tasks = cls.as_completed(fs, **kwargs) if (as_completed or kwargs) else list(fs)
    return map(operator.methodcaller('result'), tasks)

starmap(iterable, **kwargs)

Asynchronously starmap function.

Parameters:

Name Type Description Default
**kwargs

keyword options for results

{}
Source code in futured/__init__.py
53
54
55
56
57
58
59
def starmap(self, iterable: Iterable, **kwargs) -> Iterator:
    """Asynchronously starmap function.

    Args:
        **kwargs: keyword options for [results][futured.futured.results]
    """
    return self.results(itertools.starmap(self, iterable), **kwargs)

waiting(*fs, **kwargs) classmethod

Return context manager which waits on results.

Source code in futured/__init__.py
69
70
71
72
73
74
75
76
77
@classmethod
@contextlib.contextmanager
def waiting(cls, *fs, **kwargs):
    """Return context manager which waits on [results][futured.futured.results]."""
    fs = list(fs)
    try:
        yield fs
    finally:
        fs[:] = cls.results(fs, **kwargs)

futured.threaded

Bases: executed

A partial function executed in its own thread pool.

Source code in futured/__init__.py
123
124
125
126
class threaded(executed):
    """A partial function executed in its own thread pool."""

    Executor = futures.ThreadPoolExecutor

futured.processed

Bases: executed

A partial function executed in its own process pool.

Source code in futured/__init__.py
129
130
131
132
class processed(executed):
    """A partial function executed in its own process pool."""

    Executor = futures.ProcessPoolExecutor

futured.decorated(base, **decorators)

Return subclass with decorated methods.

Source code in futured/__init__.py
262
263
264
265
def decorated(base: type, **decorators: Callable) -> type:
    """Return subclass with decorated methods."""
    namespace = {name: decorators[name](getattr(base, name)) for name in decorators}
    return type(base.__name__, (base,), namespace)

futured.asynced

Bases: futured

A partial coroutine.

Anywhere futures are expected, coroutines are also supported.

Source code in futured/__init__.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class asynced(futured):
    """A partial coroutine.

    Anywhere futures are expected, coroutines are also supported.
    """

    @classmethod
    def results(cls, fs: Iterable, *, as_completed=False, **kwargs) -> Iterator:
        if as_completed or kwargs:
            return map(operator.methodcaller('result'), cls.tasks(fs, **kwargs))
        loop = asyncio.new_event_loop()
        tasks = list(map(loop.create_task, fs))
        return map(loop.run_until_complete, tasks)

    @staticmethod
    async def pair(key, future):
        return key, await future

    @classmethod
    def items(cls, pairs: Iterable, **kwargs) -> Iterator:
        return cls.results(itertools.starmap(cls.pair, pairs), as_completed=True, **kwargs)

    def run(self: Callable, *args, **kwargs):
        """Synchronously call and run coroutine or asynchronous iterator."""
        coro = self(*args, **kwargs)
        return asynced.iter(coro) if isinstance(coro, AsyncIterable) else asyncio.run(coro)

    @staticmethod
    def iter(aiterable: AsyncIterable, loop=None):
        """Wrap an asynchronous iterable into an iterator.

        Analogous to `asyncio.run` for coroutines.
        """
        loop = loop or asyncio.new_event_loop()
        anext = aiterable.__aiter__().__anext__
        task = loop.create_task(anext())
        while True:
            try:
                result = loop.run_until_complete(task)
            except StopAsyncIteration:
                return
            task = loop.create_task(anext())
            yield result

    class tasks(futured.tasks):
        __doc__ = futured.tasks.__doc__
        TimeoutError = asyncio.TimeoutError  # type: ignore

        def __init__(self, coros: Iterable, **kwargs):
            self.loop = asyncio.new_event_loop()
            super().__init__(map(self.loop.create_task, coros), **kwargs)

        def add(self, coro):
            super().add(self.loop.create_task(coro))

        def wait(self, *args, **kwargs):
            return self.loop.run_until_complete(asyncio.wait(*args, **kwargs))

iter(aiterable, loop=None) staticmethod

Wrap an asynchronous iterable into an iterator.

Analogous to asyncio.run for coroutines.

Source code in futured/__init__.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
@staticmethod
def iter(aiterable: AsyncIterable, loop=None):
    """Wrap an asynchronous iterable into an iterator.

    Analogous to `asyncio.run` for coroutines.
    """
    loop = loop or asyncio.new_event_loop()
    anext = aiterable.__aiter__().__anext__
    task = loop.create_task(anext())
    while True:
        try:
            result = loop.run_until_complete(task)
        except StopAsyncIteration:
            return
        task = loop.create_task(anext())
        yield result

run(*args, **kwargs)

Synchronously call and run coroutine or asynchronous iterator.

Source code in futured/__init__.py
165
166
167
168
def run(self: Callable, *args, **kwargs):
    """Synchronously call and run coroutine or asynchronous iterator."""
    coro = self(*args, **kwargs)
    return asynced.iter(coro) if isinstance(coro, AsyncIterable) else asyncio.run(coro)

futured.command

Bases: subprocess.Popen

Asynchronous subprocess with a future compatible interface.

Source code in futured/__init__.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class command(subprocess.Popen):
    """Asynchronous subprocess with a future compatible interface."""

    def __init__(self, *args, **kwargs):
        super().__init__(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

    def check(self, args, stdout, stderr):
        if self.returncode:
            raise subprocess.CalledProcessError(self.returncode, args, stdout, stderr)
        return stdout

    @classmethod
    async def coroutine(cls, *args, shell=False, **kwargs):
        """Create a subprocess coroutine, suitable for timeouts."""
        create = asyncio.create_subprocess_shell if shell else asyncio.create_subprocess_exec
        self = await create(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
        return cls.check(self, args, *(await self.communicate()))

    def result(self, **kwargs) -> AnyStr:
        """Return stdout or raise stderr."""
        return self.check(self.args, *self.communicate(**kwargs))

    def pipe(self, *args, **kwargs) -> 'command':
        """Pipe stdout to the next command's stdin."""
        return type(self)(*args, stdin=self.stdout, **kwargs)

    def __or__(self, other: Iterable) -> 'command':
        """Alias of [pipe][futured.command.pipe]."""
        return self.pipe(*other)

    def __iter__(self):
        """Return output lines."""
        return iter(self.result().splitlines())

__iter__()

Return output lines.

Source code in futured/__init__.py
232
233
234
def __iter__(self):
    """Return output lines."""
    return iter(self.result().splitlines())

__or__(other)

Alias of pipe.

Source code in futured/__init__.py
228
229
230
def __or__(self, other: Iterable) -> 'command':
    """Alias of [pipe][futured.command.pipe]."""
    return self.pipe(*other)

coroutine(*args, shell=False, **kwargs) classmethod async

Create a subprocess coroutine, suitable for timeouts.

Source code in futured/__init__.py
213
214
215
216
217
218
@classmethod
async def coroutine(cls, *args, shell=False, **kwargs):
    """Create a subprocess coroutine, suitable for timeouts."""
    create = asyncio.create_subprocess_shell if shell else asyncio.create_subprocess_exec
    self = await create(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
    return cls.check(self, args, *(await self.communicate()))

pipe(*args, **kwargs)

Pipe stdout to the next command's stdin.

Source code in futured/__init__.py
224
225
226
def pipe(self, *args, **kwargs) -> 'command':
    """Pipe stdout to the next command's stdin."""
    return type(self)(*args, stdin=self.stdout, **kwargs)

result(**kwargs)

Return stdout or raise stderr.

Source code in futured/__init__.py
220
221
222
def result(self, **kwargs) -> AnyStr:
    """Return stdout or raise stderr."""
    return self.check(self.args, *self.communicate(**kwargs))

futured.forked(values, max_workers=None)

Generate each value in its own child process and wait in the parent.

Source code in futured/__init__.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def forked(values: Iterable, max_workers: Optional[int] = None) -> Iterator:
    """Generate each value in its own child process and wait in the parent."""
    max_workers = max_workers or os.cpu_count() or 1  # same default as ProcessPoolExecutor
    workers: dict = {}

    def wait():
        pid, status = os.wait()
        if pid in workers:
            value = workers.pop(pid)
            if status:
                raise OSError(status, value)

    for value in values:
        while len(workers) >= max_workers:
            wait()
        pid = os.fork()
        if pid:
            workers[pid] = value
        else:  # pragma: no cover
            yield value
            os._exit(0)
    while workers:
        wait()