Skip to content

Reference

clients.base.BaseClient

Client mixin.

Parameters:

Name Type Description Default
url str

base url for requests

required
trailing str

trailing chars (e.g. /) appended to the url

''
**attrs

additional Session attributes

{}
Source code in clients/base.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class BaseClient:
    """Client mixin.

    Args:
        url: base url for requests
        trailing: trailing chars (e.g. /) appended to the url
        **attrs: additional Session attributes
    """

    def __init__(self, url: str, *, trailing: str = '', **attrs):
        super().__init__(base_url=url.rstrip('/') + '/', **attrs)  # type: ignore
        self._attrs = attrs
        self.trailing = trailing

    def __repr__(self):
        return f'{type(self).__name__}({self.url}... {self.trailing})'

    def __truediv__(self, path: str) -> Client:
        """Return a cloned client with appended path."""
        return type(self).clone(self, path)

    @property
    def url(self):
        return str(self.base_url)

    @classmethod
    def clone(cls, other, path='', **kwargs):
        url = str(other.base_url.join(path))
        kwargs.update(other._attrs)
        return cls(url, trailing=other.trailing, **kwargs)

    def request(self, method, path, **kwargs):
        """Send request with relative or absolute path and return response."""
        url = str(self.base_url.join(path)).rstrip('/') + self.trailing
        return super().request(method, url, **kwargs)

    def get(self, path='', **kwargs):
        """GET request with optional path."""
        return self.request('GET', path, **kwargs)

    def options(self, path='', **kwargs):
        """OPTIONS request with optional path."""
        return self.request('OPTIONS', path, **kwargs)

    def head(self, path='', **kwargs):
        """HEAD request with optional path."""
        return self.request('HEAD', path, **kwargs)

    def post(self, path='', json=None, **kwargs):
        """POST request with optional path and json body."""
        return self.request('POST', path, json=json, **kwargs)

    def put(self, path='', json=None, **kwargs):
        """PUT request with optional path and json body."""
        return self.request('PUT', path, json=json, **kwargs)

    def patch(self, path='', json=None, **kwargs):
        """PATCH request with optional path and json body."""
        return self.request('PATCH', path, json=json, **kwargs)

    def delete(self, path='', **kwargs):
        """DELETE request with optional path."""
        return self.request('DELETE', path, **kwargs)

__truediv__(path)

Return a cloned client with appended path.

Source code in clients/base.py
45
46
47
def __truediv__(self, path: str) -> Client:
    """Return a cloned client with appended path."""
    return type(self).clone(self, path)

delete(path='', **kwargs)

DELETE request with optional path.

Source code in clients/base.py
88
89
90
def delete(self, path='', **kwargs):
    """DELETE request with optional path."""
    return self.request('DELETE', path, **kwargs)

get(path='', **kwargs)

GET request with optional path.

Source code in clients/base.py
64
65
66
def get(self, path='', **kwargs):
    """GET request with optional path."""
    return self.request('GET', path, **kwargs)

head(path='', **kwargs)

HEAD request with optional path.

Source code in clients/base.py
72
73
74
def head(self, path='', **kwargs):
    """HEAD request with optional path."""
    return self.request('HEAD', path, **kwargs)

options(path='', **kwargs)

OPTIONS request with optional path.

Source code in clients/base.py
68
69
70
def options(self, path='', **kwargs):
    """OPTIONS request with optional path."""
    return self.request('OPTIONS', path, **kwargs)

patch(path='', json=None, **kwargs)

PATCH request with optional path and json body.

Source code in clients/base.py
84
85
86
def patch(self, path='', json=None, **kwargs):
    """PATCH request with optional path and json body."""
    return self.request('PATCH', path, json=json, **kwargs)

post(path='', json=None, **kwargs)

POST request with optional path and json body.

Source code in clients/base.py
76
77
78
def post(self, path='', json=None, **kwargs):
    """POST request with optional path and json body."""
    return self.request('POST', path, json=json, **kwargs)

put(path='', json=None, **kwargs)

PUT request with optional path and json body.

Source code in clients/base.py
80
81
82
def put(self, path='', json=None, **kwargs):
    """PUT request with optional path and json body."""
    return self.request('PUT', path, json=json, **kwargs)

request(method, path, **kwargs)

Send request with relative or absolute path and return response.

Source code in clients/base.py
59
60
61
62
def request(self, method, path, **kwargs):
    """Send request with relative or absolute path and return response."""
    url = str(self.base_url.join(path)).rstrip('/') + self.trailing
    return super().request(method, url, **kwargs)

clients.Client

Bases: BaseClient, Client

Source code in clients/base.py
93
94
95
96
97
class Client(BaseClient, httpx.Client):  # type: ignore
    def stream(self, method, path, **kwargs):  # type: ignore
        """Send request with relative or absolute path and stream response."""
        url = str(self.base_url.join(path)).rstrip('/') + self.trailing
        return super().stream(method, url, **kwargs)

stream(method, path, **kwargs)

Send request with relative or absolute path and stream response.

Source code in clients/base.py
94
95
96
97
def stream(self, method, path, **kwargs):  # type: ignore
    """Send request with relative or absolute path and stream response."""
    url = str(self.base_url.join(path)).rstrip('/') + self.trailing
    return super().stream(method, url, **kwargs)

clients.Resource

Bases: Client

A Client which returns json content and has syntactic support for requests.

Source code in clients/base.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class Resource(Client):
    """A `Client` which returns json content and has syntactic support for requests."""

    client = property(Client.clone, doc="upcasted `Client`")
    __getitem__ = Client.get
    __setitem__ = Client.put
    __delitem__ = Client.delete
    __getattr__ = Client.__truediv__
    content_type = functools.partial(content_type, text='text/', json=r'application/(\w|\.)*\+?json')

    def request(self, method, path, **kwargs):
        """Send request with path and return processed content."""
        response = super().request(method, path, **kwargs).raise_for_status()
        content_type = self.content_type(response)
        if content_type == 'json':
            return response.json()
        return response.text if content_type == 'text' else response.content

    def stream(self, method: str = 'GET', path: str = '', **kwargs) -> Iterator:  # type: ignore
        """Iterate lines or chunks from streamed request."""
        with super().stream(method, path, **kwargs) as response:
            response.raise_for_status()
            content_type = self.content_type(response)
            if content_type == 'json':
                yield from map(json.loads, response.iter_lines())
            elif content_type == 'text':
                yield from response.iter_lines()
            else:
                yield from response.iter_bytes()

    __iter__ = stream

    def __contains__(self, path: str):
        """Return whether endpoint exists according to HEAD request."""
        return not super().request('HEAD', path).is_error

    def __call__(self, path: str = '', **params):
        """GET request with params."""
        return self.get(path, params=params)

    def updater(self, path='', **kwargs):
        response = super().request('GET', path, **kwargs).raise_for_status()
        kwargs['headers'] = dict(kwargs.get('headers', {}), **validate(response))
        yield self.put(path, (yield response.json()), **kwargs)

    @contextlib.contextmanager
    def updating(self, path: str = '', **kwargs):
        """Context manager to GET and conditionally PUT json data."""
        updater = self.updater(path, **kwargs)
        json = next(updater)
        yield json
        updater.send(json)

    def update(self, path: str = '', callback: Callable | None = None, **json):
        """PATCH request with json params.

        Args:
            callback: optionally update with GET and validated PUT.
                `callback` is called on the json result with keyword params, i.e.,
                `dict` correctly implements the simple update case.
        """
        if callback is None:
            return self.patch(path, json=json)
        updater = self.updater(path)
        return updater.send(callback(next(updater), **json))

    def create(self, path: str = '', json=None, **kwargs) -> str:
        """POST request and return location."""
        response = super().request('POST', path, json=json, **kwargs).raise_for_status()
        return response.headers.get('location')

    def download(self, file, path: str = '', **kwargs):
        """Output streamed GET request to file."""
        for chunk in self.stream(path=path, **kwargs):
            file.write(chunk)
        return file

    def authorize(self, path: str = '', **kwargs) -> dict:
        """Acquire oauth access token and set `Authorization` header."""
        method = 'GET' if {'json', 'data'}.isdisjoint(kwargs) else 'POST'
        result = self.request(method, path, **kwargs)
        self.headers['authorization'] = f"{result['token_type']} {result['access_token']}"
        return result

__call__(path='', **params)

GET request with params.

Source code in clients/base.py
136
137
138
def __call__(self, path: str = '', **params):
    """GET request with params."""
    return self.get(path, params=params)

__contains__(path)

Return whether endpoint exists according to HEAD request.

Source code in clients/base.py
132
133
134
def __contains__(self, path: str):
    """Return whether endpoint exists according to HEAD request."""
    return not super().request('HEAD', path).is_error

authorize(path='', **kwargs)

Acquire oauth access token and set Authorization header.

Source code in clients/base.py
177
178
179
180
181
182
def authorize(self, path: str = '', **kwargs) -> dict:
    """Acquire oauth access token and set `Authorization` header."""
    method = 'GET' if {'json', 'data'}.isdisjoint(kwargs) else 'POST'
    result = self.request(method, path, **kwargs)
    self.headers['authorization'] = f"{result['token_type']} {result['access_token']}"
    return result

create(path='', json=None, **kwargs)

POST request and return location.

Source code in clients/base.py
166
167
168
169
def create(self, path: str = '', json=None, **kwargs) -> str:
    """POST request and return location."""
    response = super().request('POST', path, json=json, **kwargs).raise_for_status()
    return response.headers.get('location')

download(file, path='', **kwargs)

Output streamed GET request to file.

Source code in clients/base.py
171
172
173
174
175
def download(self, file, path: str = '', **kwargs):
    """Output streamed GET request to file."""
    for chunk in self.stream(path=path, **kwargs):
        file.write(chunk)
    return file

request(method, path, **kwargs)

Send request with path and return processed content.

Source code in clients/base.py
110
111
112
113
114
115
116
def request(self, method, path, **kwargs):
    """Send request with path and return processed content."""
    response = super().request(method, path, **kwargs).raise_for_status()
    content_type = self.content_type(response)
    if content_type == 'json':
        return response.json()
    return response.text if content_type == 'text' else response.content

stream(method='GET', path='', **kwargs)

Iterate lines or chunks from streamed request.

Source code in clients/base.py
118
119
120
121
122
123
124
125
126
127
128
def stream(self, method: str = 'GET', path: str = '', **kwargs) -> Iterator:  # type: ignore
    """Iterate lines or chunks from streamed request."""
    with super().stream(method, path, **kwargs) as response:
        response.raise_for_status()
        content_type = self.content_type(response)
        if content_type == 'json':
            yield from map(json.loads, response.iter_lines())
        elif content_type == 'text':
            yield from response.iter_lines()
        else:
            yield from response.iter_bytes()

update(path='', callback=None, **json)

PATCH request with json params.

Parameters:

Name Type Description Default
callback Callable | None

optionally update with GET and validated PUT. callback is called on the json result with keyword params, i.e., dict correctly implements the simple update case.

None
Source code in clients/base.py
153
154
155
156
157
158
159
160
161
162
163
164
def update(self, path: str = '', callback: Callable | None = None, **json):
    """PATCH request with json params.

    Args:
        callback: optionally update with GET and validated PUT.
            `callback` is called on the json result with keyword params, i.e.,
            `dict` correctly implements the simple update case.
    """
    if callback is None:
        return self.patch(path, json=json)
    updater = self.updater(path)
    return updater.send(callback(next(updater), **json))

updating(path='', **kwargs)

Context manager to GET and conditionally PUT json data.

Source code in clients/base.py
145
146
147
148
149
150
151
@contextlib.contextmanager
def updating(self, path: str = '', **kwargs):
    """Context manager to GET and conditionally PUT json data."""
    updater = self.updater(path, **kwargs)
    json = next(updater)
    yield json
    updater.send(json)

clients.Remote

Bases: Client

A Client which defaults to posts with json bodies, i.e., RPC.

Parameters:

Name Type Description Default
url str

base url for requests

required
json Mapping

default json body for all calls

{}
**kwargs

same options as Client

{}
Source code in clients/base.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
class Remote(Client):
    """A `Client` which defaults to posts with json bodies, i.e., RPC.

    Args:
        url: base url for requests
        json: default json body for all calls
        **kwargs: same options as `Client`
    """

    client = Resource.client
    __getattr__ = Resource.__getattr__

    def __init__(self, url: str, json: Mapping = {}, **kwargs):
        super().__init__(url, **kwargs)
        self.json = dict(json)

    @classmethod
    def clone(cls, other, path=''):
        return Client.clone.__func__(cls, other, path, json=other.json)

    def __call__(self, path: str = '', **json):
        """POST request with json body and [check][clients.base.Remote.check] result."""
        response = self.post(path, json=dict(self.json, **json)).raise_for_status()
        return self.check(response.json())

    @staticmethod
    def check(result):
        """Override to return result or raise error, for APIs which don't use status codes."""
        return result

__call__(path='', **json)

POST request with json body and check result.

Source code in clients/base.py
205
206
207
208
def __call__(self, path: str = '', **json):
    """POST request with json body and [check][clients.base.Remote.check] result."""
    response = self.post(path, json=dict(self.json, **json)).raise_for_status()
    return self.check(response.json())

check(result) staticmethod

Override to return result or raise error, for APIs which don't use status codes.

Source code in clients/base.py
210
211
212
213
@staticmethod
def check(result):
    """Override to return result or raise error, for APIs which don't use status codes."""
    return result

clients.Graph

Bases: Remote

A Remote client which executes GraphQL queries.

Source code in clients/base.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class Graph(Remote):
    """A `Remote` client which executes GraphQL queries."""

    Error = httpx.HTTPError

    @classmethod
    def check(cls, result: dict):
        """Return `data` or raise `errors`."""
        for error in result.get('errors', ()):
            raise cls.Error(error)
        return result.get('data')

    def execute(self, query: str, **variables):
        """Execute query over POST."""
        return self(query=query, variables=variables)

check(result) classmethod

Return data or raise errors.

Source code in clients/base.py
221
222
223
224
225
226
@classmethod
def check(cls, result: dict):
    """Return `data` or raise `errors`."""
    for error in result.get('errors', ()):
        raise cls.Error(error)
    return result.get('data')

execute(query, **variables)

Execute query over POST.

Source code in clients/base.py
228
229
230
def execute(self, query: str, **variables):
    """Execute query over POST."""
    return self(query=query, variables=variables)

clients.Proxy

Bases: Client

An extensible embedded proxy client to multiple hosts.

The default implementation provides load balancing based on active connections. It does not provide error handling or retrying.

Parameters:

Name Type Description Default
*urls str

base urls for requests

()
**kwargs

same options as Client

{}
Source code in clients/base.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
class Proxy(Client):
    """An extensible embedded proxy client to multiple hosts.

    The default implementation provides load balancing based on active connections.
    It does not provide error handling or retrying.

    Args:
        *urls: base urls for requests
        **kwargs: same options as `Client`
    """

    Stats = Stats

    def __init__(self, *urls: str, **kwargs):
        super().__init__('https://proxies', **kwargs)
        self.urls = {(url.rstrip('/') + '/'): self.Stats() for url in urls}

    @classmethod
    def clone(cls, other, path=''):
        urls = (urljoin(url, path) for url in other.urls)
        return cls(*urls, trailing=other.trailing, **other._attrs)

    def priority(self, url: str):
        """Return comparable priority for url.

        Minimizes errors, failures (500s), and active connections.
        None may be used to eliminate from consideration.
        """
        stats = self.urls[url]
        return tuple(stats[key] for key in ('errors', 'failures', 'connections'))

    def choice(self, method: str) -> str:
        """Return chosen url according to priority.

        Args:
            method: placeholder for extensions which distinguish read/write requests
        """
        priorities = collections.defaultdict(list)
        for url in self.urls:
            priorities[self.priority(url)].append(url)
        priorities.pop(None, None)
        return random.choice(priorities[min(priorities)])

    def request(self, method, path, **kwargs):
        """Send request with relative or absolute path and return response."""
        url = self.choice(method)
        with self.urls[url] as stats:
            response = super().request(method, urljoin(url, path), **kwargs)
        stats.add(failures=int(response.status_code >= 500))
        return response

choice(method)

Return chosen url according to priority.

Parameters:

Name Type Description Default
method str

placeholder for extensions which distinguish read/write requests

required
Source code in clients/base.py
286
287
288
289
290
291
292
293
294
295
296
def choice(self, method: str) -> str:
    """Return chosen url according to priority.

    Args:
        method: placeholder for extensions which distinguish read/write requests
    """
    priorities = collections.defaultdict(list)
    for url in self.urls:
        priorities[self.priority(url)].append(url)
    priorities.pop(None, None)
    return random.choice(priorities[min(priorities)])

priority(url)

Return comparable priority for url.

Minimizes errors, failures (500s), and active connections. None may be used to eliminate from consideration.

Source code in clients/base.py
277
278
279
280
281
282
283
284
def priority(self, url: str):
    """Return comparable priority for url.

    Minimizes errors, failures (500s), and active connections.
    None may be used to eliminate from consideration.
    """
    stats = self.urls[url]
    return tuple(stats[key] for key in ('errors', 'failures', 'connections'))

request(method, path, **kwargs)

Send request with relative or absolute path and return response.

Source code in clients/base.py
298
299
300
301
302
303
304
def request(self, method, path, **kwargs):
    """Send request with relative or absolute path and return response."""
    url = self.choice(method)
    with self.urls[url] as stats:
        response = super().request(method, urljoin(url, path), **kwargs)
    stats.add(failures=int(response.status_code >= 500))
    return response

clients.AsyncClient

Bases: BaseClient, AsyncClient

Source code in clients/aio.py
10
11
12
13
class AsyncClient(BaseClient, httpx.AsyncClient):  # type: ignore
    def run(self, name: str, *args, **kwargs):
        """Synchronously call method and run coroutine."""
        return asyncio.new_event_loop().run_until_complete(getattr(self, name)(*args, **kwargs))

run(name, *args, **kwargs)

Synchronously call method and run coroutine.

Source code in clients/aio.py
11
12
13
def run(self, name: str, *args, **kwargs):
    """Synchronously call method and run coroutine."""
    return asyncio.new_event_loop().run_until_complete(getattr(self, name)(*args, **kwargs))

clients.AsyncResource

Bases: AsyncClient

An AsyncClient which returns json content and has syntactic support for requests.

Source code in clients/aio.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class AsyncResource(AsyncClient):
    """An `AsyncClient` which returns json content and has syntactic support for requests."""

    client = property(AsyncClient.clone, doc="upcasted `AsyncClient`")
    __getattr__ = AsyncClient.__truediv__
    __getitem__ = AsyncClient.get
    content_type = Resource.content_type
    __call__ = Resource.__call__

    async def request(self, method, path, **kwargs):
        """Send request with path and return processed content."""
        response = (await super().request(method, path, **kwargs)).raise_for_status()
        if self.content_type(response) == 'json':
            return response.json()
        return response.text if response.encoding else response.content

    async def updater(self, path='', **kwargs):
        response = (await super().request('GET', path, **kwargs)).raise_for_status()
        kwargs['headers'] = dict(kwargs.get('headers', {}), **validate(response))
        yield await self.put(path, (yield response.json()), **kwargs)

    @contextlib.asynccontextmanager
    async def updating(self, path: str = '', **kwargs):
        """Context manager to GET and conditionally PUT json data."""
        updater = self.updater(path, **kwargs)
        json = await updater.__anext__()
        yield json
        await updater.asend(json)

    async def update(self, path: str = '', callback: Callable | None = None, **json):
        """PATCH request with json params.

        Args:
            callback: optionally update with GET and validated PUT.
                `callback` is called on the json result with keyword params, i.e.,
                `dict` correctly implements the simple update case.
        """
        if callback is None:
            return await self.patch(path, json)
        updater = self.updater(path)
        return await updater.asend(callback(await updater.__anext__(), **json))

    async def authorize(self, path: str = '', **kwargs) -> dict:
        """Acquire oauth access token and set `Authorization` header."""
        method = 'GET' if {'json', 'data'}.isdisjoint(kwargs) else 'POST'
        result = await self.request(method, path, **kwargs)
        self.headers['authorization'] = f"{result['token_type']} {result['access_token']}"
        self._attrs['headers'] = self.headers
        return result

authorize(path='', **kwargs) async

Acquire oauth access token and set Authorization header.

Source code in clients/aio.py
58
59
60
61
62
63
64
async def authorize(self, path: str = '', **kwargs) -> dict:
    """Acquire oauth access token and set `Authorization` header."""
    method = 'GET' if {'json', 'data'}.isdisjoint(kwargs) else 'POST'
    result = await self.request(method, path, **kwargs)
    self.headers['authorization'] = f"{result['token_type']} {result['access_token']}"
    self._attrs['headers'] = self.headers
    return result

request(method, path, **kwargs) async

Send request with path and return processed content.

Source code in clients/aio.py
25
26
27
28
29
30
async def request(self, method, path, **kwargs):
    """Send request with path and return processed content."""
    response = (await super().request(method, path, **kwargs)).raise_for_status()
    if self.content_type(response) == 'json':
        return response.json()
    return response.text if response.encoding else response.content

update(path='', callback=None, **json) async

PATCH request with json params.

Parameters:

Name Type Description Default
callback Callable | None

optionally update with GET and validated PUT. callback is called on the json result with keyword params, i.e., dict correctly implements the simple update case.

None
Source code in clients/aio.py
45
46
47
48
49
50
51
52
53
54
55
56
async def update(self, path: str = '', callback: Callable | None = None, **json):
    """PATCH request with json params.

    Args:
        callback: optionally update with GET and validated PUT.
            `callback` is called on the json result with keyword params, i.e.,
            `dict` correctly implements the simple update case.
    """
    if callback is None:
        return await self.patch(path, json)
    updater = self.updater(path)
    return await updater.asend(callback(await updater.__anext__(), **json))

updating(path='', **kwargs) async

Context manager to GET and conditionally PUT json data.

Source code in clients/aio.py
37
38
39
40
41
42
43
@contextlib.asynccontextmanager
async def updating(self, path: str = '', **kwargs):
    """Context manager to GET and conditionally PUT json data."""
    updater = self.updater(path, **kwargs)
    json = await updater.__anext__()
    yield json
    await updater.asend(json)

clients.AsyncRemote

Bases: AsyncClient

An AsyncClient which defaults to posts with json bodies, i.e., RPC.

Parameters:

Name Type Description Default
url str

base url for requests

required
json Mapping

default json body for all calls

{}
**kwargs

same options as AsyncClient

{}
Source code in clients/aio.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class AsyncRemote(AsyncClient):
    """An `AsyncClient` which defaults to posts with json bodies, i.e., RPC.

    Args:
        url: base url for requests
        json: default json body for all calls
        **kwargs: same options as `AsyncClient`
    """

    client = AsyncResource.client
    __getattr__ = AsyncResource.__getattr__
    check = staticmethod(Remote.check)

    def __init__(self, url: str, json: Mapping = {}, **kwargs):
        super().__init__(url, **kwargs)
        self.json = dict(json)

    @classmethod
    def clone(cls, other, path=''):
        return AsyncClient.clone.__func__(cls, other, path, json=other.json)

    async def __call__(self, path='', **json):
        """POST request with json body and check result."""
        response = (await self.post(path, json=dict(self.json, **json))).raise_for_status()
        return self.check(response.json())

__call__(path='', **json) async

POST request with json body and check result.

Source code in clients/aio.py
88
89
90
91
async def __call__(self, path='', **json):
    """POST request with json body and check result."""
    response = (await self.post(path, json=dict(self.json, **json))).raise_for_status()
    return self.check(response.json())

clients.AsyncGraph

Bases: AsyncRemote

An AsyncRemote client which executes GraphQL queries.

Source code in clients/aio.py
94
95
96
97
98
99
class AsyncGraph(AsyncRemote):
    """An `AsyncRemote` client which executes GraphQL queries."""

    Error = httpx.HTTPError
    execute = Graph.execute
    check = classmethod(Graph.check.__func__)  # type: ignore

clients.AsyncProxy

Bases: AsyncClient

An extensible embedded proxy client to multiple hosts.

The default implementation provides load balancing based on active connections. It does not provide error handling or retrying.

Parameters:

Name Type Description Default
*urls str

base urls for requests

()
**kwargs

same options as AsyncClient

{}
Source code in clients/aio.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class AsyncProxy(AsyncClient):
    """An extensible embedded proxy client to multiple hosts.

    The default implementation provides load balancing based on active connections.
    It does not provide error handling or retrying.

    Args:
        *urls: base urls for requests
        **kwargs: same options as `AsyncClient`
    """

    Stats = Proxy.Stats
    priority = Proxy.priority
    choice = Proxy.choice

    def __init__(self, *urls: str, **kwargs):
        super().__init__('https://proxies', **kwargs)
        self.urls = {(url.rstrip('/') + '/'): self.Stats() for url in urls}

    @classmethod
    def clone(cls, other, path=''):
        urls = (urljoin(url, path) for url in other.urls)
        return cls(*urls, trailing=other.trailing, **other._attrs)

    async def request(self, method, path, **kwargs):
        """Send request with relative or absolute path and return response."""
        url = self.choice(method)
        with self.urls[url] as stats:
            response = await super().request(method, urljoin(url, path), **kwargs)
        stats.add(failures=int(response.status_code >= 500))
        return response

request(method, path, **kwargs) async

Send request with relative or absolute path and return response.

Source code in clients/aio.py
126
127
128
129
130
131
132
async def request(self, method, path, **kwargs):
    """Send request with relative or absolute path and return response."""
    url = self.choice(method)
    with self.urls[url] as stats:
        response = await super().request(method, urljoin(url, path), **kwargs)
    stats.add(failures=int(response.status_code >= 500))
    return response

clients.singleton(*args, **kwargs)

Return a decorator for singleton class instances.

Source code in clients/__init__.py
7
8
9
def singleton(*args, **kwargs):
    """Return a decorator for singleton class instances."""
    return lambda cls: cls(*args, **kwargs)