Skip to content

Reference

Client mixin.

Parameters:

Name Type Description Default
url str

base url for requests

required
trailing str

trailing chars (e.g. /) appended to the url

''
**attrs

additional Session attributes

{}
Source code in clients/base.py
class BaseClient:
    """Client mixin.

    Args:
        url: base url for requests
        trailing: trailing chars (e.g. /) appended to the url
        **attrs: additional Session attributes
    """

    def __init__(self, url: str, *, trailing: str = "", **attrs):
        super().__init__(base_url=url.rstrip("/") + "/", **attrs)  # type: ignore
        self._attrs = attrs
        self.trailing = trailing

    def __repr__(self):
        return f"{type(self).__name__}({self.url}... {self.trailing})"

    def __truediv__(self, path: str) -> Self:
        """Return a cloned client with appended path."""
        return type(self).clone(self, path)

    @property
    def url(self):
        return str(self.base_url)  # type: ignore

    @classmethod
    def clone(cls, other, path="", **kwargs):
        url = str(other.base_url.join(path))
        return cls(url, trailing=other.trailing, **(other._attrs | kwargs))

    def request(self, method, path, **kwargs):
        """Send request with relative or absolute path and return response."""
        url = str(self.base_url.join(path)).rstrip("/") + self.trailing  # type: ignore
        return super().request(method, url, **kwargs)  # type: ignore

    def get(self, path="", **kwargs):
        """GET request with optional path."""
        return self.request("GET", path, **kwargs)

    def options(self, path="", **kwargs):
        """OPTIONS request with optional path."""
        return self.request("OPTIONS", path, **kwargs)

    def head(self, path="", **kwargs):
        """HEAD request with optional path."""
        return self.request("HEAD", path, **kwargs)

    def post(self, path="", json=None, **kwargs):
        """POST request with optional path and json body."""
        return self.request("POST", path, json=json, **kwargs)

    def put(self, path="", json=None, **kwargs):
        """PUT request with optional path and json body."""
        return self.request("PUT", path, json=json, **kwargs)

    def patch(self, path="", json=None, **kwargs):
        """PATCH request with optional path and json body."""
        return self.request("PATCH", path, json=json, **kwargs)

    def delete(self, path="", **kwargs):
        """DELETE request with optional path."""
        return self.request("DELETE", path, **kwargs)

__truediv__(path)

Return a cloned client with appended path.

Source code in clients/base.py
def __truediv__(self, path: str) -> Self:
    """Return a cloned client with appended path."""
    return type(self).clone(self, path)

delete(path='', **kwargs)

DELETE request with optional path.

Source code in clients/base.py
def delete(self, path="", **kwargs):
    """DELETE request with optional path."""
    return self.request("DELETE", path, **kwargs)

get(path='', **kwargs)

GET request with optional path.

Source code in clients/base.py
def get(self, path="", **kwargs):
    """GET request with optional path."""
    return self.request("GET", path, **kwargs)

head(path='', **kwargs)

HEAD request with optional path.

Source code in clients/base.py
def head(self, path="", **kwargs):
    """HEAD request with optional path."""
    return self.request("HEAD", path, **kwargs)

options(path='', **kwargs)

OPTIONS request with optional path.

Source code in clients/base.py
def options(self, path="", **kwargs):
    """OPTIONS request with optional path."""
    return self.request("OPTIONS", path, **kwargs)

patch(path='', json=None, **kwargs)

PATCH request with optional path and json body.

Source code in clients/base.py
def patch(self, path="", json=None, **kwargs):
    """PATCH request with optional path and json body."""
    return self.request("PATCH", path, json=json, **kwargs)

post(path='', json=None, **kwargs)

POST request with optional path and json body.

Source code in clients/base.py
def post(self, path="", json=None, **kwargs):
    """POST request with optional path and json body."""
    return self.request("POST", path, json=json, **kwargs)

put(path='', json=None, **kwargs)

PUT request with optional path and json body.

Source code in clients/base.py
def put(self, path="", json=None, **kwargs):
    """PUT request with optional path and json body."""
    return self.request("PUT", path, json=json, **kwargs)

request(method, path, **kwargs)

Send request with relative or absolute path and return response.

Source code in clients/base.py
def request(self, method, path, **kwargs):
    """Send request with relative or absolute path and return response."""
    url = str(self.base_url.join(path)).rstrip("/") + self.trailing  # type: ignore
    return super().request(method, url, **kwargs)  # type: ignore

Bases: BaseClient, Client

Source code in clients/base.py
class Client(BaseClient, httpx.Client):
    def stream(self, method, path, **kwargs):
        """Send request with relative or absolute path and stream response."""
        url = str(self.base_url.join(path)).rstrip("/") + self.trailing
        return super().stream(method, url, **kwargs)

stream(method, path, **kwargs)

Send request with relative or absolute path and stream response.

Source code in clients/base.py
def stream(self, method, path, **kwargs):
    """Send request with relative or absolute path and stream response."""
    url = str(self.base_url.join(path)).rstrip("/") + self.trailing
    return super().stream(method, url, **kwargs)

Bases: Client

A Client which returns json content and has syntactic support for requests.

Source code in clients/base.py
class Resource(Client):
    """A `Client` which returns json content and has syntactic support for requests."""

    client = property(Client.clone, doc="upcasted `Client`")
    __getitem__ = Client.get
    __setitem__ = Client.put
    __delitem__ = Client.delete
    __getattr__ = Client.__truediv__
    content_type = staticmethod(
        functools.partial(content_type, text="text/", json=r"application/(\w|\.)*\+?json")
    )

    def request(self, method, path, **kwargs):
        """Send request with path and return processed content."""
        response = super().request(method, path, **kwargs).raise_for_status()
        match self.content_type(response):
            case "json":
                return response.json()
            case "text":
                return response.text
        return response.content

    def stream(self, method: str = "GET", path: str = "", **kwargs) -> Iterator:
        """Iterate lines or chunks from streamed request."""
        with super().stream(method, path, **kwargs) as response:
            match self.content_type(response.raise_for_status()):
                case "json":
                    yield from map(json.loads, response.iter_lines())
                case "text":
                    yield from response.iter_lines()
                case _:
                    yield from response.iter_bytes()

    __iter__ = stream

    def __contains__(self, path: str):
        """Return whether endpoint exists according to HEAD request."""
        return not super().request("HEAD", path).is_error

    def __call__(self, path: str = "", **params):
        """GET request with params."""
        return self.get(path, params=params)

    def updater(self, path="", **kwargs):
        response = super().request("GET", path, **kwargs).raise_for_status()
        kwargs["headers"] = dict(kwargs.get("headers", {}), **validate(response))
        yield self.put(path, (yield response.json()), **kwargs)

    @contextlib.contextmanager
    def updating(self, path: str = "", **kwargs):
        """Context manager to GET and conditionally PUT json data."""
        updater = self.updater(path, **kwargs)
        json = next(updater)
        yield json
        updater.send(json)

    def update(self, path: str = "", callback: Callable | None = None, **json):
        """PATCH request with json params.

        Args:
            callback: optionally update with GET and validated PUT.
                `callback` is called on the json result with keyword params, i.e.,
                `dict` correctly implements the simple update case.
        """
        if callback is None:
            return self.patch(path, json=json)
        updater = self.updater(path)
        return updater.send(callback(next(updater), **json))

    def create(self, path: str = "", json=None, **kwargs) -> str:
        """POST request and return location."""
        response = super().request("POST", path, json=json, **kwargs).raise_for_status()
        return response.headers.get("location")

    def download(self, file, path: str = "", **kwargs):
        """Output streamed GET request to file."""
        for chunk in self.stream(path=path, **kwargs):
            file.write(chunk)
        return file

    def authorize(self, path: str = "", **kwargs) -> dict:
        """Acquire oauth access token and set `Authorization` header."""
        method = "GET" if {"json", "data"}.isdisjoint(kwargs) else "POST"
        result = self.request(method, path, **kwargs)
        self.headers["authorization"] = f"{result['token_type']} {result['access_token']}"
        return result

__call__(path='', **params)

GET request with params.

Source code in clients/base.py
def __call__(self, path: str = "", **params):
    """GET request with params."""
    return self.get(path, params=params)

__contains__(path)

Return whether endpoint exists according to HEAD request.

Source code in clients/base.py
def __contains__(self, path: str):
    """Return whether endpoint exists according to HEAD request."""
    return not super().request("HEAD", path).is_error

authorize(path='', **kwargs)

Acquire oauth access token and set Authorization header.

Source code in clients/base.py
def authorize(self, path: str = "", **kwargs) -> dict:
    """Acquire oauth access token and set `Authorization` header."""
    method = "GET" if {"json", "data"}.isdisjoint(kwargs) else "POST"
    result = self.request(method, path, **kwargs)
    self.headers["authorization"] = f"{result['token_type']} {result['access_token']}"
    return result

create(path='', json=None, **kwargs)

POST request and return location.

Source code in clients/base.py
def create(self, path: str = "", json=None, **kwargs) -> str:
    """POST request and return location."""
    response = super().request("POST", path, json=json, **kwargs).raise_for_status()
    return response.headers.get("location")

download(file, path='', **kwargs)

Output streamed GET request to file.

Source code in clients/base.py
def download(self, file, path: str = "", **kwargs):
    """Output streamed GET request to file."""
    for chunk in self.stream(path=path, **kwargs):
        file.write(chunk)
    return file

request(method, path, **kwargs)

Send request with path and return processed content.

Source code in clients/base.py
def request(self, method, path, **kwargs):
    """Send request with path and return processed content."""
    response = super().request(method, path, **kwargs).raise_for_status()
    match self.content_type(response):
        case "json":
            return response.json()
        case "text":
            return response.text
    return response.content

stream(method='GET', path='', **kwargs)

Iterate lines or chunks from streamed request.

Source code in clients/base.py
def stream(self, method: str = "GET", path: str = "", **kwargs) -> Iterator:
    """Iterate lines or chunks from streamed request."""
    with super().stream(method, path, **kwargs) as response:
        match self.content_type(response.raise_for_status()):
            case "json":
                yield from map(json.loads, response.iter_lines())
            case "text":
                yield from response.iter_lines()
            case _:
                yield from response.iter_bytes()

update(path='', callback=None, **json)

PATCH request with json params.

Parameters:

Name Type Description Default
callback Callable | None

optionally update with GET and validated PUT. callback is called on the json result with keyword params, i.e., dict correctly implements the simple update case.

None
Source code in clients/base.py
def update(self, path: str = "", callback: Callable | None = None, **json):
    """PATCH request with json params.

    Args:
        callback: optionally update with GET and validated PUT.
            `callback` is called on the json result with keyword params, i.e.,
            `dict` correctly implements the simple update case.
    """
    if callback is None:
        return self.patch(path, json=json)
    updater = self.updater(path)
    return updater.send(callback(next(updater), **json))

updating(path='', **kwargs)

Context manager to GET and conditionally PUT json data.

Source code in clients/base.py
@contextlib.contextmanager
def updating(self, path: str = "", **kwargs):
    """Context manager to GET and conditionally PUT json data."""
    updater = self.updater(path, **kwargs)
    json = next(updater)
    yield json
    updater.send(json)

Bases: Client

A Client which defaults to posts with json bodies, i.e., RPC.

Parameters:

Name Type Description Default
url str

base url for requests

required
json Mapping

default json body for all calls

{}
**kwargs

same options as Client

{}
Source code in clients/base.py
class Remote(Client):
    """A `Client` which defaults to posts with json bodies, i.e., RPC.

    Args:
        url: base url for requests
        json: default json body for all calls
        **kwargs: same options as `Client`
    """

    client = Resource.client
    __getattr__ = Resource.__getattr__

    def __init__(self, url: str, json: Mapping = {}, **kwargs):
        super().__init__(url, **kwargs)
        self.json = dict(json)

    @classmethod
    def clone(cls, other, path=""):
        return Client.clone.__func__(cls, other, path, json=other.json)

    def __call__(self, path: str = "", **json):
        """POST request with json body and [check][clients.base.Remote.check] result."""
        response = self.post(path, json=dict(self.json, **json)).raise_for_status()
        return self.check(response.json())

    @staticmethod
    def check(result):
        """Override to return result or raise error, for APIs which don't use status codes."""
        return result

__call__(path='', **json)

POST request with json body and check result.

Source code in clients/base.py
def __call__(self, path: str = "", **json):
    """POST request with json body and [check][clients.base.Remote.check] result."""
    response = self.post(path, json=dict(self.json, **json)).raise_for_status()
    return self.check(response.json())

check(result) staticmethod

Override to return result or raise error, for APIs which don't use status codes.

Source code in clients/base.py
@staticmethod
def check(result):
    """Override to return result or raise error, for APIs which don't use status codes."""
    return result

Bases: Remote

A Remote client which executes GraphQL queries.

Source code in clients/base.py
class Graph(Remote):
    """A `Remote` client which executes GraphQL queries."""

    Error = ValueError

    @classmethod
    def check(cls, result: dict):
        """Return `data` or raise `errors`."""
        for error in result.get("errors", ()):
            raise cls.Error(error)
        return result.get("data")

    def execute(self, query: str, **variables):
        """Execute query over POST."""
        return self(query=query, variables=variables)

check(result) classmethod

Return data or raise errors.

Source code in clients/base.py
@classmethod
def check(cls, result: dict):
    """Return `data` or raise `errors`."""
    for error in result.get("errors", ()):
        raise cls.Error(error)
    return result.get("data")

execute(query, **variables)

Execute query over POST.

Source code in clients/base.py
def execute(self, query: str, **variables):
    """Execute query over POST."""
    return self(query=query, variables=variables)

Bases: Client

An extensible embedded proxy client to multiple hosts.

The default implementation provides load balancing based on active connections. It does not provide error handling or retrying.

Parameters:

Name Type Description Default
*urls str

base urls for requests

()
**kwargs

same options as Client

{}
Source code in clients/base.py
class Proxy(Client):
    """An extensible embedded proxy client to multiple hosts.

    The default implementation provides load balancing based on active connections.
    It does not provide error handling or retrying.

    Args:
        *urls: base urls for requests
        **kwargs: same options as `Client`
    """

    Stats = Stats

    def __init__(self, *urls: str, **kwargs):
        super().__init__("https://proxies", **kwargs)
        self.urls = {(url.rstrip("/") + "/"): self.Stats() for url in urls}

    @classmethod
    def clone(cls, other, path=""):
        urls = (urljoin(url, path) for url in other.urls)
        return cls(*urls, trailing=other.trailing, **other._attrs)

    def priority(self, url: str):
        """Return comparable priority for url.

        Minimizes errors, failures (500s), and active connections.
        None may be used to eliminate from consideration.
        """
        stats = self.urls[url]
        return tuple(stats[key] for key in ("errors", "failures", "connections"))

    def choice(self, method: str) -> str:
        """Return chosen url according to priority.

        Args:
            method: placeholder for extensions which distinguish read/write requests
        """
        priorities = collections.defaultdict(list)
        for url in self.urls:
            priorities[self.priority(url)].append(url)
        priorities.pop(None, None)
        return random.choice(priorities[min(priorities)])

    def request(self, method, path, **kwargs):
        """Send request with relative or absolute path and return response."""
        url = self.choice(method)
        with self.urls[url] as stats:
            response = super().request(method, urljoin(url, path), **kwargs)
        stats.add(failures=int(response.is_server_error))
        return response

choice(method)

Return chosen url according to priority.

Parameters:

Name Type Description Default
method str

placeholder for extensions which distinguish read/write requests

required
Source code in clients/base.py
def choice(self, method: str) -> str:
    """Return chosen url according to priority.

    Args:
        method: placeholder for extensions which distinguish read/write requests
    """
    priorities = collections.defaultdict(list)
    for url in self.urls:
        priorities[self.priority(url)].append(url)
    priorities.pop(None, None)
    return random.choice(priorities[min(priorities)])

priority(url)

Return comparable priority for url.

Minimizes errors, failures (500s), and active connections. None may be used to eliminate from consideration.

Source code in clients/base.py
def priority(self, url: str):
    """Return comparable priority for url.

    Minimizes errors, failures (500s), and active connections.
    None may be used to eliminate from consideration.
    """
    stats = self.urls[url]
    return tuple(stats[key] for key in ("errors", "failures", "connections"))

request(method, path, **kwargs)

Send request with relative or absolute path and return response.

Source code in clients/base.py
def request(self, method, path, **kwargs):
    """Send request with relative or absolute path and return response."""
    url = self.choice(method)
    with self.urls[url] as stats:
        response = super().request(method, urljoin(url, path), **kwargs)
    stats.add(failures=int(response.is_server_error))
    return response

Bases: BaseClient, AsyncClient

Source code in clients/aio.py
class AsyncClient(BaseClient, httpx.AsyncClient):
    def run(self, name: str, *args, **kwargs):
        """Synchronously call method and run coroutine."""
        return asyncio.new_event_loop().run_until_complete(getattr(self, name)(*args, **kwargs))

run(name, *args, **kwargs)

Synchronously call method and run coroutine.

Source code in clients/aio.py
def run(self, name: str, *args, **kwargs):
    """Synchronously call method and run coroutine."""
    return asyncio.new_event_loop().run_until_complete(getattr(self, name)(*args, **kwargs))

Bases: AsyncClient

An AsyncClient which returns json content and has syntactic support for requests.

Source code in clients/aio.py
class AsyncResource(AsyncClient):
    """An `AsyncClient` which returns json content and has syntactic support for requests."""

    client = property(AsyncClient.clone, doc="upcasted `AsyncClient`")
    __getattr__ = AsyncClient.__truediv__
    __getitem__ = AsyncClient.get
    content_type = staticmethod(Resource.content_type)
    __call__ = Resource.__call__

    async def request(self, method, path, **kwargs):
        """Send request with path and return processed content."""
        response = (await super().request(method, path, **kwargs)).raise_for_status()
        match self.content_type(response):
            case "json":
                return response.json()
            case "text":
                return response.text
        return response.content

    async def updater(self, path="", **kwargs):
        response = (await super().request("GET", path, **kwargs)).raise_for_status()
        kwargs["headers"] = dict(kwargs.get("headers", {}), **validate(response))
        yield await self.put(path, (yield response.json()), **kwargs)

    @contextlib.asynccontextmanager
    async def updating(self, path: str = "", **kwargs):
        """Context manager to GET and conditionally PUT json data."""
        updater = self.updater(path, **kwargs)
        json = await updater.__anext__()
        yield json
        await updater.asend(json)

    async def update(self, path: str = "", callback: Callable | None = None, **json):
        """PATCH request with json params.

        Args:
            callback: optionally update with GET and validated PUT.
                `callback` is called on the json result with keyword params, i.e.,
                `dict` correctly implements the simple update case.
        """
        if callback is None:
            return await self.patch(path, json)
        updater = self.updater(path)
        return await updater.asend(callback(await updater.__anext__(), **json))

    async def authorize(self, path: str = "", **kwargs) -> dict:
        """Acquire oauth access token and set `Authorization` header."""
        method = "GET" if {"json", "data"}.isdisjoint(kwargs) else "POST"
        result = await self.request(method, path, **kwargs)
        self.headers["authorization"] = f"{result['token_type']} {result['access_token']}"
        self._attrs["headers"] = self.headers
        return result

authorize(path='', **kwargs) async

Acquire oauth access token and set Authorization header.

Source code in clients/aio.py
async def authorize(self, path: str = "", **kwargs) -> dict:
    """Acquire oauth access token and set `Authorization` header."""
    method = "GET" if {"json", "data"}.isdisjoint(kwargs) else "POST"
    result = await self.request(method, path, **kwargs)
    self.headers["authorization"] = f"{result['token_type']} {result['access_token']}"
    self._attrs["headers"] = self.headers
    return result

request(method, path, **kwargs) async

Send request with path and return processed content.

Source code in clients/aio.py
async def request(self, method, path, **kwargs):
    """Send request with path and return processed content."""
    response = (await super().request(method, path, **kwargs)).raise_for_status()
    match self.content_type(response):
        case "json":
            return response.json()
        case "text":
            return response.text
    return response.content

update(path='', callback=None, **json) async

PATCH request with json params.

Parameters:

Name Type Description Default
callback Callable | None

optionally update with GET and validated PUT. callback is called on the json result with keyword params, i.e., dict correctly implements the simple update case.

None
Source code in clients/aio.py
async def update(self, path: str = "", callback: Callable | None = None, **json):
    """PATCH request with json params.

    Args:
        callback: optionally update with GET and validated PUT.
            `callback` is called on the json result with keyword params, i.e.,
            `dict` correctly implements the simple update case.
    """
    if callback is None:
        return await self.patch(path, json)
    updater = self.updater(path)
    return await updater.asend(callback(await updater.__anext__(), **json))

updating(path='', **kwargs) async

Context manager to GET and conditionally PUT json data.

Source code in clients/aio.py
@contextlib.asynccontextmanager
async def updating(self, path: str = "", **kwargs):
    """Context manager to GET and conditionally PUT json data."""
    updater = self.updater(path, **kwargs)
    json = await updater.__anext__()
    yield json
    await updater.asend(json)

Bases: AsyncClient

An AsyncClient which defaults to posts with json bodies, i.e., RPC.

Parameters:

Name Type Description Default
url str

base url for requests

required
json Mapping

default json body for all calls

{}
**kwargs

same options as AsyncClient

{}
Source code in clients/aio.py
class AsyncRemote(AsyncClient):
    """An `AsyncClient` which defaults to posts with json bodies, i.e., RPC.

    Args:
        url: base url for requests
        json: default json body for all calls
        **kwargs: same options as `AsyncClient`
    """

    client = AsyncResource.client
    __getattr__ = AsyncResource.__getattr__
    check = staticmethod(Remote.check)

    def __init__(self, url: str, json: Mapping = {}, **kwargs):
        super().__init__(url, **kwargs)
        self.json = dict(json)

    @classmethod
    def clone(cls, other, path=""):
        return AsyncClient.clone.__func__(cls, other, path, json=other.json)

    async def __call__(self, path="", **json):
        """POST request with json body and check result."""
        response = (await self.post(path, json=dict(self.json, **json))).raise_for_status()
        return self.check(response.json())

__call__(path='', **json) async

POST request with json body and check result.

Source code in clients/aio.py
async def __call__(self, path="", **json):
    """POST request with json body and check result."""
    response = (await self.post(path, json=dict(self.json, **json))).raise_for_status()
    return self.check(response.json())

Bases: AsyncRemote

An AsyncRemote client which executes GraphQL queries.

Source code in clients/aio.py
class AsyncGraph(AsyncRemote):
    """An `AsyncRemote` client which executes GraphQL queries."""

    Error = httpx.HTTPError
    execute = Graph.execute
    check = classmethod(Graph.check.__func__)

Bases: AsyncClient

An extensible embedded proxy client to multiple hosts.

The default implementation provides load balancing based on active connections. It does not provide error handling or retrying.

Parameters:

Name Type Description Default
*urls str

base urls for requests

()
**kwargs

same options as AsyncClient

{}
Source code in clients/aio.py
class AsyncProxy(AsyncClient):
    """An extensible embedded proxy client to multiple hosts.

    The default implementation provides load balancing based on active connections.
    It does not provide error handling or retrying.

    Args:
        *urls: base urls for requests
        **kwargs: same options as `AsyncClient`
    """

    Stats = Proxy.Stats
    priority = Proxy.priority
    choice = Proxy.choice

    def __init__(self, *urls: str, **kwargs):
        super().__init__("https://proxies", **kwargs)
        self.urls = {(url.rstrip("/") + "/"): self.Stats() for url in urls}

    @classmethod
    def clone(cls, other, path=""):
        urls = (urljoin(url, path) for url in other.urls)
        return cls(*urls, trailing=other.trailing, **other._attrs)

    async def request(self, method, path, **kwargs):
        """Send request with relative or absolute path and return response."""
        url = self.choice(method)  # type: ignore
        with self.urls[url] as stats:
            response = await super().request(method, urljoin(url, path), **kwargs)
        stats.add(failures=int(response.status_code >= 500))
        return response

request(method, path, **kwargs) async

Send request with relative or absolute path and return response.

Source code in clients/aio.py
async def request(self, method, path, **kwargs):
    """Send request with relative or absolute path and return response."""
    url = self.choice(method)  # type: ignore
    with self.urls[url] as stats:
        response = await super().request(method, urljoin(url, path), **kwargs)
    stats.add(failures=int(response.status_code >= 500))
    return response

Return a decorator for singleton class instances.

Source code in clients/__init__.py
5
6
7
def singleton(*args, **kwargs):
    """Return a decorator for singleton class instances."""
    return lambda cls: cls(*args, **kwargs)