Skip to content

Reference

waiter.waiter

An iterable which sleeps for given delays. Aliased as wait.

Parameters:

Name Type Description Default
delays iterable | number

any iterable of seconds, or a scalar which is repeated endlessly

required
timeout number

optional timeout for iteration

float('inf')
Source code in waiter/__init__.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
class waiter:
    """An iterable which sleeps for given delays. Aliased as `wait`.

    Args:
        delays iterable | number: any iterable of seconds, or a scalar which is repeated endlessly
        timeout number: optional timeout for iteration
    """

    Stats = Stats

    def __init__(self, delays, timeout=float('inf')):
        with suppress(TypeError) as excs:
            iter(delays)
        self.delays = itertools.repeat(delays) if excs else delays
        self.timeout = timeout
        self.stats = self.Stats()

    def __iter__(self):
        """Generate a slow loop of elapsed time."""
        start = time.time()
        yield self.stats.add(0, 0.0)
        for attempt, delay in enumerate(self.delays, 1):
            remaining = start + self.timeout - time.time()
            if remaining < 0:
                break
            time.sleep(min(delay, remaining))
            yield self.stats.add(attempt, time.time() - start)

    async def __aiter__(self):
        """Asynchronously generate a slow loop of elapsed time."""
        start = time.time()
        yield self.stats.add(0, 0.0)
        for attempt, delay in enumerate(self.delays, 1):
            remaining = start + self.timeout - time.time()
            if remaining < 0:
                break
            await asyncio.sleep(min(delay, remaining))
            yield self.stats.add(attempt, time.time() - start)

    def clone(self, func: Callable, *args) -> 'waiter':
        return type(self)(reiter(func, *args), self.timeout)

    def map(self, func: Callable, *iterables: Iterable) -> 'waiter':
        """Return new waiter with function mapped across delays."""
        return self.clone(map, func, self.delays, *iterables)

    @classmethod
    def fibonacci(cls, delay, **kwargs) -> 'waiter':
        """Create waiter with fibonacci backoff."""
        return cls(reiter(fibonacci, delay, delay), **kwargs)

    @classmethod
    def count(cls, *args, **kwargs) -> 'waiter':
        """Create waiter based on `itertools.count`."""
        return cls(reiter(itertools.count, *args), **kwargs)

    @classmethod
    def accumulate(cls, *args, **kwargs) -> 'waiter':
        """Create waiter based on `itertools.accumulate`."""
        return cls(reiter(itertools.accumulate, *args), **kwargs)

    @classmethod
    def exponential(cls, base, **kwargs) -> 'waiter':
        """Create waiter with exponential backoff."""
        return cls.count(**kwargs).map(base.__pow__)

    @classmethod
    def polynomial(cls, exp, **kwargs) -> 'waiter':
        """Create waiter with polynomial backoff."""
        return cls.count(**kwargs).map(exp.__rpow__)

    def __getitem__(self, slc: slice) -> 'waiter':
        """Slice delays, e.g., to limit attempt count."""
        return self.clone(itertools.islice, self.delays, slc.start, slc.stop, slc.step)

    def __le__(self, ceiling) -> 'waiter':
        """Limit maximum delay generated."""
        return self.map(partial(min, ceiling))

    def __ge__(self, floor) -> 'waiter':
        """Limit minimum delay generated."""
        return self.map(partial(max, floor))

    def __add__(self, step) -> 'waiter':
        """Generate incremental backoff."""
        return self.map(operator.add, reiter(itertools.count, 0, step))

    def __mul__(self, factor) -> 'waiter':
        """Generate exponential backoff."""
        return self.map(operator.mul, reiter(map, factor.__pow__, reiter(itertools.count)))

    def random(self, start, stop) -> 'waiter':
        """Add random jitter within given range."""
        return self.map(lambda delay: delay + random.uniform(start, stop))

    @singledispatchmethod
    def throttle(self, iterable: Iterable):
        """Delay iteration."""
        return map(operator.itemgetter(1), zip(self, iterable))

    @throttle.register
    async def _(self, iterable: AsyncIterable) -> AsyncIterator:
        anext = iterable.__aiter__().__anext__
        with suppress(StopAsyncIteration):  # type: ignore
            async for _ in self:
                yield await anext()

    def stream(self, queue: Iterable, size: Optional[int] = None) -> Iterator:
        """Generate chained values in groups from an iterable.

        The queue can be extended while in use.
        """
        it = iter(queue)
        groups = iter(lambda: list(itertools.islice(it, size)), [])
        if isinstance(queue, Sequence):
            groups = grouped(queue, size)
        return itertools.chain.from_iterable(self.throttle(groups))

    def suppressed(self, exception, func: Callable, iterable: Iterable) -> Iterator[tuple]:
        """Generate `arg, func(arg)` pairs while exception isn't raised."""
        queue = list(iterable)
        for arg in self.stream(queue):
            try:
                yield arg, func(arg)
            except exception:
                queue.append(arg)

    def filtered(self, predicate: Callable, func: Callable, iterable: Iterable) -> Iterator[tuple]:
        """Generate `arg, func(arg)` pairs while predicate evaluates to true."""
        queue = list(iterable)
        for arg in self.stream(queue):
            result = func(arg)
            if predicate(result):
                yield arg, result
            else:
                queue.append(arg)

    def repeat(self, func: Callable, *args, **kwargs):
        """Repeat function call."""
        if asyncio.iscoroutinefunction(func):
            return self.arepeat(func, *args, **kwargs)
        return (func(*args, **kwargs) for _ in self)

    async def arepeat(self, func: Callable, *args, **kwargs) -> AsyncIterator:
        async for _ in self:
            yield await func(*args, **kwargs)

    def retry(self, exception: Exception, func: Callable, *args, **kwargs):
        """Repeat function call until exception isn't raised."""
        if asyncio.iscoroutinefunction(func):
            return self.aretry(exception, func, *args, **kwargs)
        for _ in self:
            with suppress(exception) as excs:
                return func(*args, **kwargs)
        raise excs[0]

    async def aretry(self, exception: Exception, func: Callable, *args, **kwargs):
        async for _ in self:
            with suppress(exception) as excs:
                return await func(*args, **kwargs)
        raise excs[0]

    def poll(self, predicate: Callable, func: Callable, *args, **kwargs):
        """Repeat function call until predicate evaluates to true."""
        if asyncio.iscoroutinefunction(func):
            return self.apoll(predicate, func, *args, **kwargs)
        return first(predicate, self.repeat(func, *args, **kwargs))

    async def apoll(self, predicate: Callable, func: Callable, *args, **kwargs):
        async for result in self.repeat(func, *args, **kwargs):
            if predicate(result):  # pragma: no branch
                return result
        raise StopAsyncIteration

    def repeating(self, func: Callable):
        """A decorator for `repeat`."""
        return partialmethod(self.repeat, func)

    def retrying(self, exception: Exception):
        """Return a decorator for `retry`."""
        return partial(partialmethod, self.retry, exception)

    def polling(self, predicate: Callable):
        """Return a decorator for `poll`."""
        return partial(partialmethod, self.poll, predicate)

__add__(step)

Generate incremental backoff.

Source code in waiter/__init__.py
149
150
151
def __add__(self, step) -> 'waiter':
    """Generate incremental backoff."""
    return self.map(operator.add, reiter(itertools.count, 0, step))

__aiter__() async

Asynchronously generate a slow loop of elapsed time.

Source code in waiter/__init__.py
 94
 95
 96
 97
 98
 99
100
101
102
103
async def __aiter__(self):
    """Asynchronously generate a slow loop of elapsed time."""
    start = time.time()
    yield self.stats.add(0, 0.0)
    for attempt, delay in enumerate(self.delays, 1):
        remaining = start + self.timeout - time.time()
        if remaining < 0:
            break
        await asyncio.sleep(min(delay, remaining))
        yield self.stats.add(attempt, time.time() - start)

__ge__(floor)

Limit minimum delay generated.

Source code in waiter/__init__.py
145
146
147
def __ge__(self, floor) -> 'waiter':
    """Limit minimum delay generated."""
    return self.map(partial(max, floor))

__getitem__(slc)

Slice delays, e.g., to limit attempt count.

Source code in waiter/__init__.py
137
138
139
def __getitem__(self, slc: slice) -> 'waiter':
    """Slice delays, e.g., to limit attempt count."""
    return self.clone(itertools.islice, self.delays, slc.start, slc.stop, slc.step)

__iter__()

Generate a slow loop of elapsed time.

Source code in waiter/__init__.py
83
84
85
86
87
88
89
90
91
92
def __iter__(self):
    """Generate a slow loop of elapsed time."""
    start = time.time()
    yield self.stats.add(0, 0.0)
    for attempt, delay in enumerate(self.delays, 1):
        remaining = start + self.timeout - time.time()
        if remaining < 0:
            break
        time.sleep(min(delay, remaining))
        yield self.stats.add(attempt, time.time() - start)

__le__(ceiling)

Limit maximum delay generated.

Source code in waiter/__init__.py
141
142
143
def __le__(self, ceiling) -> 'waiter':
    """Limit maximum delay generated."""
    return self.map(partial(min, ceiling))

__mul__(factor)

Generate exponential backoff.

Source code in waiter/__init__.py
153
154
155
def __mul__(self, factor) -> 'waiter':
    """Generate exponential backoff."""
    return self.map(operator.mul, reiter(map, factor.__pow__, reiter(itertools.count)))

accumulate(*args, **kwargs) classmethod

Create waiter based on itertools.accumulate.

Source code in waiter/__init__.py
122
123
124
125
@classmethod
def accumulate(cls, *args, **kwargs) -> 'waiter':
    """Create waiter based on `itertools.accumulate`."""
    return cls(reiter(itertools.accumulate, *args), **kwargs)

count(*args, **kwargs) classmethod

Create waiter based on itertools.count.

Source code in waiter/__init__.py
117
118
119
120
@classmethod
def count(cls, *args, **kwargs) -> 'waiter':
    """Create waiter based on `itertools.count`."""
    return cls(reiter(itertools.count, *args), **kwargs)

exponential(base, **kwargs) classmethod

Create waiter with exponential backoff.

Source code in waiter/__init__.py
127
128
129
130
@classmethod
def exponential(cls, base, **kwargs) -> 'waiter':
    """Create waiter with exponential backoff."""
    return cls.count(**kwargs).map(base.__pow__)

fibonacci(delay, **kwargs) classmethod

Create waiter with fibonacci backoff.

Source code in waiter/__init__.py
112
113
114
115
@classmethod
def fibonacci(cls, delay, **kwargs) -> 'waiter':
    """Create waiter with fibonacci backoff."""
    return cls(reiter(fibonacci, delay, delay), **kwargs)

filtered(predicate, func, iterable)

Generate arg, func(arg) pairs while predicate evaluates to true.

Source code in waiter/__init__.py
193
194
195
196
197
198
199
200
201
def filtered(self, predicate: Callable, func: Callable, iterable: Iterable) -> Iterator[tuple]:
    """Generate `arg, func(arg)` pairs while predicate evaluates to true."""
    queue = list(iterable)
    for arg in self.stream(queue):
        result = func(arg)
        if predicate(result):
            yield arg, result
        else:
            queue.append(arg)

map(func, *iterables)

Return new waiter with function mapped across delays.

Source code in waiter/__init__.py
108
109
110
def map(self, func: Callable, *iterables: Iterable) -> 'waiter':
    """Return new waiter with function mapped across delays."""
    return self.clone(map, func, self.delays, *iterables)

poll(predicate, func, *args, **kwargs)

Repeat function call until predicate evaluates to true.

Source code in waiter/__init__.py
228
229
230
231
232
def poll(self, predicate: Callable, func: Callable, *args, **kwargs):
    """Repeat function call until predicate evaluates to true."""
    if asyncio.iscoroutinefunction(func):
        return self.apoll(predicate, func, *args, **kwargs)
    return first(predicate, self.repeat(func, *args, **kwargs))

polling(predicate)

Return a decorator for poll.

Source code in waiter/__init__.py
248
249
250
def polling(self, predicate: Callable):
    """Return a decorator for `poll`."""
    return partial(partialmethod, self.poll, predicate)

polynomial(exp, **kwargs) classmethod

Create waiter with polynomial backoff.

Source code in waiter/__init__.py
132
133
134
135
@classmethod
def polynomial(cls, exp, **kwargs) -> 'waiter':
    """Create waiter with polynomial backoff."""
    return cls.count(**kwargs).map(exp.__rpow__)

random(start, stop)

Add random jitter within given range.

Source code in waiter/__init__.py
157
158
159
def random(self, start, stop) -> 'waiter':
    """Add random jitter within given range."""
    return self.map(lambda delay: delay + random.uniform(start, stop))

repeat(func, *args, **kwargs)

Repeat function call.

Source code in waiter/__init__.py
203
204
205
206
207
def repeat(self, func: Callable, *args, **kwargs):
    """Repeat function call."""
    if asyncio.iscoroutinefunction(func):
        return self.arepeat(func, *args, **kwargs)
    return (func(*args, **kwargs) for _ in self)

repeating(func)

A decorator for repeat.

Source code in waiter/__init__.py
240
241
242
def repeating(self, func: Callable):
    """A decorator for `repeat`."""
    return partialmethod(self.repeat, func)

retry(exception, func, *args, **kwargs)

Repeat function call until exception isn't raised.

Source code in waiter/__init__.py
213
214
215
216
217
218
219
220
def retry(self, exception: Exception, func: Callable, *args, **kwargs):
    """Repeat function call until exception isn't raised."""
    if asyncio.iscoroutinefunction(func):
        return self.aretry(exception, func, *args, **kwargs)
    for _ in self:
        with suppress(exception) as excs:
            return func(*args, **kwargs)
    raise excs[0]

retrying(exception)

Return a decorator for retry.

Source code in waiter/__init__.py
244
245
246
def retrying(self, exception: Exception):
    """Return a decorator for `retry`."""
    return partial(partialmethod, self.retry, exception)

stream(queue, size=None)

Generate chained values in groups from an iterable.

The queue can be extended while in use.

Source code in waiter/__init__.py
173
174
175
176
177
178
179
180
181
182
def stream(self, queue: Iterable, size: Optional[int] = None) -> Iterator:
    """Generate chained values in groups from an iterable.

    The queue can be extended while in use.
    """
    it = iter(queue)
    groups = iter(lambda: list(itertools.islice(it, size)), [])
    if isinstance(queue, Sequence):
        groups = grouped(queue, size)
    return itertools.chain.from_iterable(self.throttle(groups))

suppressed(exception, func, iterable)

Generate arg, func(arg) pairs while exception isn't raised.

Source code in waiter/__init__.py
184
185
186
187
188
189
190
191
def suppressed(self, exception, func: Callable, iterable: Iterable) -> Iterator[tuple]:
    """Generate `arg, func(arg)` pairs while exception isn't raised."""
    queue = list(iterable)
    for arg in self.stream(queue):
        try:
            yield arg, func(arg)
        except exception:
            queue.append(arg)

throttle(iterable)

Delay iteration.

Source code in waiter/__init__.py
161
162
163
164
@singledispatchmethod
def throttle(self, iterable: Iterable):
    """Delay iteration."""
    return map(operator.itemgetter(1), zip(self, iterable))

waiter.suppress(*exceptions)

Variant of contextlib.suppress, which also records exception.

Source code in waiter/__init__.py
21
22
23
24
25
26
27
28
@contextlib.contextmanager
def suppress(*exceptions: Exception):
    """Variant of `contextlib.suppress`, which also records exception."""
    excs: list = []
    try:
        yield excs
    except exceptions as exc:  # type: ignore
        excs.append(exc)

waiter.first(predicate, iterable, *default)

Return first item which evaluates to true, like any with filtering.

Source code in waiter/__init__.py
31
32
33
def first(predicate: Callable, iterable: Iterable, *default):
    """Return first item which evaluates to true, like `any` with filtering."""
    return next(filter(predicate, iterable), *default)