Skip to content

Reference

waiter.waiter

An iterable which sleeps for given delays. Aliased as wait.

Parameters:

Name Type Description Default
delays iterable | number

any iterable of seconds, or a scalar which is repeated endlessly

required
timeout number

optional timeout for iteration

float('inf')
Source code in waiter/__init__.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
class waiter:
    """An iterable which sleeps for given delays. Aliased as `wait`.

    Args:
        delays iterable | number: any iterable of seconds, or a scalar which is repeated endlessly
        timeout number: optional timeout for iteration
    """

    Stats = Stats

    def __init__(self, delays, timeout=float('inf')):
        with suppress(TypeError) as excs:
            iter(delays)
        self.delays = itertools.repeat(delays) if excs else delays
        self.timeout = timeout
        self.stats = self.Stats()

    def __iter__(self):
        """Generate a slow loop of elapsed time."""
        start = time.time()
        yield self.stats.add(0, 0.0)
        for attempt, delay in enumerate(self.delays, 1):
            remaining = start + self.timeout - time.time()
            if remaining < 0:
                break
            time.sleep(min(delay, remaining))
            yield self.stats.add(attempt, time.time() - start)

    async def __aiter__(self):
        """Asynchronously generate a slow loop of elapsed time."""
        start = time.time()
        yield self.stats.add(0, 0.0)
        for attempt, delay in enumerate(self.delays, 1):
            remaining = start + self.timeout - time.time()
            if remaining < 0:
                break
            await asyncio.sleep(min(delay, remaining))
            yield self.stats.add(attempt, time.time() - start)

    def clone(self, func: Callable, *args) -> 'waiter':
        return type(self)(reiter(func, *args), self.timeout)

    def map(self, func: Callable, *iterables: Iterable) -> 'waiter':
        """Return new waiter with function mapped across delays."""
        return self.clone(map, func, self.delays, *iterables)

    @classmethod
    def fibonacci(cls, delay, **kwargs) -> 'waiter':
        """Create waiter with fibonacci backoff."""
        return cls(reiter(fibonacci, delay, delay), **kwargs)

    @classmethod
    def count(cls, *args, **kwargs) -> 'waiter':
        """Create waiter based on `itertools.count`."""
        return cls(reiter(itertools.count, *args), **kwargs)

    @classmethod
    def accumulate(cls, *args, **kwargs) -> 'waiter':
        """Create waiter based on `itertools.accumulate`."""
        return cls(reiter(itertools.accumulate, *args), **kwargs)

    @classmethod
    def exponential(cls, base, **kwargs) -> 'waiter':
        """Create waiter with exponential backoff."""
        return cls.count(**kwargs).map(base.__pow__)

    @classmethod
    def polynomial(cls, exp, **kwargs) -> 'waiter':
        """Create waiter with polynomial backoff."""
        return cls.count(**kwargs).map(exp.__rpow__)

    def __getitem__(self, slc: slice) -> 'waiter':
        """Slice delays, e.g., to limit attempt count."""
        return self.clone(itertools.islice, self.delays, slc.start, slc.stop, slc.step)

    def __le__(self, ceiling) -> 'waiter':
        """Limit maximum delay generated."""
        return self.map(partial(min, ceiling))

    def __ge__(self, floor) -> 'waiter':
        """Limit minimum delay generated."""
        return self.map(partial(max, floor))

    def __add__(self, step) -> 'waiter':
        """Generate incremental backoff."""
        return self.map(operator.add, reiter(itertools.count, 0, step))

    def __mul__(self, factor) -> 'waiter':
        """Generate exponential backoff."""
        return self.map(operator.mul, reiter(map, factor.__pow__, reiter(itertools.count)))

    def random(self, start, stop) -> 'waiter':
        """Add random jitter within given range."""
        return self.map(lambda delay: delay + random.uniform(start, stop))

    @multimethod
    def throttle(self, iterable) -> Iterator:
        """Delay iteration."""
        return map(operator.itemgetter(1), zip(self, iterable))

    @multimethod
    async def throttle(self, iterable: AsyncIterable):
        anext = iterable.__aiter__().__anext__
        with suppress(StopAsyncIteration):
            async for _ in self:
                yield await anext()

    def stream(self, queue: Iterable, size: Optional[int] = None) -> Iterator:
        """Generate chained values in groups from an iterable.

        The queue can be extended while in use.
        """
        it = iter(queue)
        groups = iter(lambda: list(itertools.islice(it, size)), [])
        if isinstance(queue, Sequence):
            groups = grouped(queue, size)
        return itertools.chain.from_iterable(self.throttle(groups))

    def suppressed(self, exception, func: Callable, iterable: Iterable) -> Iterator[tuple]:
        """Generate `arg, func(arg)` pairs while exception isn't raised."""
        queue = list(iterable)
        for arg in self.stream(queue):
            try:
                yield arg, func(arg)
            except exception:
                queue.append(arg)

    def filtered(self, predicate: Callable, func: Callable, iterable: Iterable) -> Iterator[tuple]:
        """Generate `arg, func(arg)` pairs while predicate evaluates to true."""
        queue = list(iterable)
        for arg in self.stream(queue):
            result = func(arg)
            if predicate(result):
                yield arg, result
            else:
                queue.append(arg)

    @overload
    def repeat(self, func, *args, **kwargs):
        """Repeat function call."""
        return (func(*args, **kwargs) for _ in self)

    @overload
    async def repeat(self, func: iscoro, *args, **kwargs):
        async for _ in self:
            yield await func(*args, **kwargs)

    @overload
    def retry(self, exception, func, *args, **kwargs):
        """Repeat function call until exception isn't raised."""
        for _ in self:
            with suppress(exception) as excs:
                return func(*args, **kwargs)
        raise excs[0]

    @overload
    async def retry(self, exception, func: iscoro, *args, **kwargs):
        async for _ in self:
            with suppress(exception) as excs:
                return await func(*args, **kwargs)
        raise excs[0]

    @overload
    def poll(self, predicate, func, *args, **kwargs):
        """Repeat function call until predicate evaluates to true."""
        return first(predicate, self.repeat(func, *args, **kwargs))

    @overload
    async def poll(self, predicate, func: iscoro, *args, **kwargs):
        async for result in self.repeat(func, *args, **kwargs):
            if predicate(result):  # pragma: no branch
                return result
        raise StopAsyncIteration

    def repeating(self, func: Callable):
        """A decorator for `repeat`."""
        return partialmethod(self.repeat, func)

    def retrying(self, exception: Exception):
        """Return a decorator for `retry`."""
        return partial(partialmethod, self.retry, exception)

    def polling(self, predicate: Callable):
        """Return a decorator for `poll`."""
        return partial(partialmethod, self.poll, predicate)

__add__(step)

Generate incremental backoff.

Source code in waiter/__init__.py
165
166
167
def __add__(self, step) -> 'waiter':
    """Generate incremental backoff."""
    return self.map(operator.add, reiter(itertools.count, 0, step))

__aiter__() async

Asynchronously generate a slow loop of elapsed time.

Source code in waiter/__init__.py
110
111
112
113
114
115
116
117
118
119
async def __aiter__(self):
    """Asynchronously generate a slow loop of elapsed time."""
    start = time.time()
    yield self.stats.add(0, 0.0)
    for attempt, delay in enumerate(self.delays, 1):
        remaining = start + self.timeout - time.time()
        if remaining < 0:
            break
        await asyncio.sleep(min(delay, remaining))
        yield self.stats.add(attempt, time.time() - start)

__ge__(floor)

Limit minimum delay generated.

Source code in waiter/__init__.py
161
162
163
def __ge__(self, floor) -> 'waiter':
    """Limit minimum delay generated."""
    return self.map(partial(max, floor))

__getitem__(slc)

Slice delays, e.g., to limit attempt count.

Source code in waiter/__init__.py
153
154
155
def __getitem__(self, slc: slice) -> 'waiter':
    """Slice delays, e.g., to limit attempt count."""
    return self.clone(itertools.islice, self.delays, slc.start, slc.stop, slc.step)

__iter__()

Generate a slow loop of elapsed time.

Source code in waiter/__init__.py
 99
100
101
102
103
104
105
106
107
108
def __iter__(self):
    """Generate a slow loop of elapsed time."""
    start = time.time()
    yield self.stats.add(0, 0.0)
    for attempt, delay in enumerate(self.delays, 1):
        remaining = start + self.timeout - time.time()
        if remaining < 0:
            break
        time.sleep(min(delay, remaining))
        yield self.stats.add(attempt, time.time() - start)

__le__(ceiling)

Limit maximum delay generated.

Source code in waiter/__init__.py
157
158
159
def __le__(self, ceiling) -> 'waiter':
    """Limit maximum delay generated."""
    return self.map(partial(min, ceiling))

__mul__(factor)

Generate exponential backoff.

Source code in waiter/__init__.py
169
170
171
def __mul__(self, factor) -> 'waiter':
    """Generate exponential backoff."""
    return self.map(operator.mul, reiter(map, factor.__pow__, reiter(itertools.count)))

accumulate(*args, **kwargs) classmethod

Create waiter based on itertools.accumulate.

Source code in waiter/__init__.py
138
139
140
141
@classmethod
def accumulate(cls, *args, **kwargs) -> 'waiter':
    """Create waiter based on `itertools.accumulate`."""
    return cls(reiter(itertools.accumulate, *args), **kwargs)

count(*args, **kwargs) classmethod

Create waiter based on itertools.count.

Source code in waiter/__init__.py
133
134
135
136
@classmethod
def count(cls, *args, **kwargs) -> 'waiter':
    """Create waiter based on `itertools.count`."""
    return cls(reiter(itertools.count, *args), **kwargs)

exponential(base, **kwargs) classmethod

Create waiter with exponential backoff.

Source code in waiter/__init__.py
143
144
145
146
@classmethod
def exponential(cls, base, **kwargs) -> 'waiter':
    """Create waiter with exponential backoff."""
    return cls.count(**kwargs).map(base.__pow__)

fibonacci(delay, **kwargs) classmethod

Create waiter with fibonacci backoff.

Source code in waiter/__init__.py
128
129
130
131
@classmethod
def fibonacci(cls, delay, **kwargs) -> 'waiter':
    """Create waiter with fibonacci backoff."""
    return cls(reiter(fibonacci, delay, delay), **kwargs)

filtered(predicate, func, iterable)

Generate arg, func(arg) pairs while predicate evaluates to true.

Source code in waiter/__init__.py
209
210
211
212
213
214
215
216
217
def filtered(self, predicate: Callable, func: Callable, iterable: Iterable) -> Iterator[tuple]:
    """Generate `arg, func(arg)` pairs while predicate evaluates to true."""
    queue = list(iterable)
    for arg in self.stream(queue):
        result = func(arg)
        if predicate(result):
            yield arg, result
        else:
            queue.append(arg)

map(func, *iterables)

Return new waiter with function mapped across delays.

Source code in waiter/__init__.py
124
125
126
def map(self, func: Callable, *iterables: Iterable) -> 'waiter':
    """Return new waiter with function mapped across delays."""
    return self.clone(map, func, self.delays, *iterables)

polling(predicate)

Return a decorator for poll.

Source code in waiter/__init__.py
264
265
266
def polling(self, predicate: Callable):
    """Return a decorator for `poll`."""
    return partial(partialmethod, self.poll, predicate)

polynomial(exp, **kwargs) classmethod

Create waiter with polynomial backoff.

Source code in waiter/__init__.py
148
149
150
151
@classmethod
def polynomial(cls, exp, **kwargs) -> 'waiter':
    """Create waiter with polynomial backoff."""
    return cls.count(**kwargs).map(exp.__rpow__)

random(start, stop)

Add random jitter within given range.

Source code in waiter/__init__.py
173
174
175
def random(self, start, stop) -> 'waiter':
    """Add random jitter within given range."""
    return self.map(lambda delay: delay + random.uniform(start, stop))

repeating(func)

A decorator for repeat.

Source code in waiter/__init__.py
256
257
258
def repeating(self, func: Callable):
    """A decorator for `repeat`."""
    return partialmethod(self.repeat, func)

retrying(exception)

Return a decorator for retry.

Source code in waiter/__init__.py
260
261
262
def retrying(self, exception: Exception):
    """Return a decorator for `retry`."""
    return partial(partialmethod, self.retry, exception)

stream(queue, size=None)

Generate chained values in groups from an iterable.

The queue can be extended while in use.

Source code in waiter/__init__.py
189
190
191
192
193
194
195
196
197
198
def stream(self, queue: Iterable, size: Optional[int] = None) -> Iterator:
    """Generate chained values in groups from an iterable.

    The queue can be extended while in use.
    """
    it = iter(queue)
    groups = iter(lambda: list(itertools.islice(it, size)), [])
    if isinstance(queue, Sequence):
        groups = grouped(queue, size)
    return itertools.chain.from_iterable(self.throttle(groups))

suppressed(exception, func, iterable)

Generate arg, func(arg) pairs while exception isn't raised.

Source code in waiter/__init__.py
200
201
202
203
204
205
206
207
def suppressed(self, exception, func: Callable, iterable: Iterable) -> Iterator[tuple]:
    """Generate `arg, func(arg)` pairs while exception isn't raised."""
    queue = list(iterable)
    for arg in self.stream(queue):
        try:
            yield arg, func(arg)
        except exception:
            queue.append(arg)

waiter.suppress(*exceptions)

Variant of contextlib.suppress, which also records exception.

Source code in waiter/__init__.py
26
27
28
29
30
31
32
33
@contextlib.contextmanager
def suppress(*exceptions: Exception):
    """Variant of `contextlib.suppress`, which also records exception."""
    excs: list = []
    try:
        yield excs
    except exceptions as exc:  # type: ignore
        excs.append(exc)

waiter.first(predicate, iterable, *default)

Return first item which evaluates to true, like any with filtering.

Source code in waiter/__init__.py
36
37
38
def first(predicate: Callable, iterable: Iterable, *default):
    """Return first item which evaluates to true, like `any` with filtering."""
    return next(filter(predicate, iterable), *default)