Skip to content

Reference

Note

Spatial lookups require gis to be enabled.

Mixin for field lookups.

Source code in model_values/__init__.py
class Lookup:
    """Mixin for field lookups."""

    __ne__ = eq("ne")
    __lt__ = eq("lt")
    __le__ = eq("lte")
    __gt__ = eq("gt")
    __ge__ = eq("gte")
    iexact = eq("iexact")
    icontains = eq("icontains")
    startswith = eq("startswith")
    istartswith = eq("istartswith")
    endswith = eq("endswith")
    iendswith = eq("iendswith")
    regex = eq("regex")
    iregex = eq("iregex")
    isin = eq("in")
    # spatial lookups
    contained = eq("contained")
    coveredby = eq("coveredby")
    covers = eq("covers")
    crosses = eq("crosses")
    disjoint = eq("disjoint")
    equals = eq("equals")  # __eq__ is taken
    intersects = eq("intersects")  # __and__ is ambiguous
    touches = eq("touches")
    __lshift__ = left = eq("left")
    __rshift__ = right = eq("right")
    above = eq("strictly_above")
    below = eq("strictly_below")

    def __eq__(self, value, lookup: str = ""): ...

    def range(self, *values):
        """range"""
        return self.__eq__(values, "__range")

    def relate(self, *values):
        """relate"""
        return self.__eq__(values, "__relate")

    @property
    def is_valid(self):
        """Whether field `isvalid`."""
        return self.__eq__(True, "__isvalid")

    def contains(self, value, properly=False, bb=False):
        """Return whether field `contains` the value.  Options apply only to geom fields.

        Args:
            properly: `contains_properly`
            bb: bounding box, `bbcontains`
        """
        properly = "_properly" * bool(properly)
        bb = "bb" * bool(bb)
        return self.__eq__(value, f"__{bb}contains{properly}")

    def overlaps(self, geom, position="", bb=False):
        """Return whether field `overlaps` with geometry .

        Args:
            position: `overlaps_{left, right, above, below}`
            bb: bounding box, `bboverlaps`
        """
        bb = "bb" * bool(bb)
        return self.__eq__(geom, f"__{bb}overlaps_{position}".rstrip("_"))

    def within(self, geom, distance=None):
        """Return whether field is `within` geometry.

        Args:
            distance: `dwithin`
        """
        if distance is None:
            return self.__eq__(geom, "__within")
        return self.__eq__((geom, distance), "__dwithin")

is_valid property

Whether field isvalid.

contains(value, properly=False, bb=False)

Return whether field contains the value. Options apply only to geom fields.

Parameters:

Name Type Description Default
properly

contains_properly

False
bb

bounding box, bbcontains

False
Source code in model_values/__init__.py
def contains(self, value, properly=False, bb=False):
    """Return whether field `contains` the value.  Options apply only to geom fields.

    Args:
        properly: `contains_properly`
        bb: bounding box, `bbcontains`
    """
    properly = "_properly" * bool(properly)
    bb = "bb" * bool(bb)
    return self.__eq__(value, f"__{bb}contains{properly}")

overlaps(geom, position='', bb=False)

Return whether field overlaps with geometry .

Parameters:

Name Type Description Default
position

overlaps_{left, right, above, below}

''
bb

bounding box, bboverlaps

False
Source code in model_values/__init__.py
def overlaps(self, geom, position="", bb=False):
    """Return whether field `overlaps` with geometry .

    Args:
        position: `overlaps_{left, right, above, below}`
        bb: bounding box, `bboverlaps`
    """
    bb = "bb" * bool(bb)
    return self.__eq__(geom, f"__{bb}overlaps_{position}".rstrip("_"))

range(*values)

range

Source code in model_values/__init__.py
def range(self, *values):
    """range"""
    return self.__eq__(values, "__range")

relate(*values)

relate

Source code in model_values/__init__.py
def relate(self, *values):
    """relate"""
    return self.__eq__(values, "__relate")

within(geom, distance=None)

Return whether field is within geometry.

Parameters:

Name Type Description Default
distance

dwithin

None
Source code in model_values/__init__.py
def within(self, geom, distance=None):
    """Return whether field is `within` geometry.

    Args:
        distance: `dwithin`
    """
    if distance is None:
        return self.__eq__(geom, "__within")
    return self.__eq__((geom, distance), "__dwithin")

Note

Since attributes are used for constructing F objects, there may be collisions between field names and methods. For example, name is a reserved attribute, but the usual constructor can still be used: F('name').

Note

See source for available spatial functions if gis is configured.

Bases: F, Lookup

Create F, Q, and Func objects with expressions.

F creation supported as attributes: F.user == F('user'), F.user.created == F('user__created').

Q lookups supported as methods or operators: F.text.iexact(...) == Q(text__iexact=...), F.user.created >= ... == Q(user__created__gte=...).

Func objects also supported as methods: F.user.created.min() == Min('user__created').

Some Func objects can also be transformed into lookups, if registered: F.text.length() == Length(F('text')), F.text.length > 0 == Q(text__length__gt=0).

Source code in model_values/__init__.py
class F(models.F, Lookup, metaclass=MetaF):
    """Create `F`, `Q`, and `Func` objects with expressions.

    `F` creation supported as attributes:
    `F.user` == `F('user')`,
    `F.user.created` == `F('user__created')`.

    `Q` lookups supported as methods or operators:
    `F.text.iexact(...)` == `Q(text__iexact=...)`,
    `F.user.created >= ...` == `Q(user__created__gte=...)`.

    `Func` objects also supported as methods:
    `F.user.created.min()` == `Min('user__created')`.

    Some `Func` objects can also be transformed into lookups,
    if [registered](https://docs.djangoproject.com/en/stable/ref/models/database-functions/#length):
    `F.text.length()` == `Length(F('text'))`,
    `F.text.length > 0` == `Q(text__length__gt=0)`.
    """

    lookups = dict(
        length=functions.Length,
        lower=functions.Lower,
        upper=functions.Upper,
        chr=functions.Chr,
        ord=functions.Ord,
        acos=functions.ACos,
        asin=functions.ASin,
        atan=functions.ATan,
        atan2=functions.ATan2,
        cos=functions.Cos,
        cot=functions.Cot,
        degrees=functions.Degrees,
        exp=functions.Exp,
        radians=functions.Radians,
        sin=functions.Sin,
        sqrt=functions.Sqrt,
        tan=functions.Tan,
        sign=functions.Sign,
        md5=functions.MD5,
    )
    coalesce = method(functions.Coalesce)
    concat = method(functions.Concat)  # __add__ is taken
    min = method(models.Min)
    max = method(models.Max)
    sum = method(models.Sum)
    mean = method(models.Avg)
    var = method(models.Variance)
    std = method(models.StdDev)
    greatest = method(functions.Greatest)
    least = method(functions.Least)
    now = staticmethod(functions.Now)
    cast = method(functions.Cast)
    extract = method(functions.Extract)
    trunc = method(functions.Trunc)
    cume_dist = method(functions.CumeDist)
    dense_rank = method(functions.DenseRank)
    first_value = method(functions.FirstValue)
    lag = method(functions.Lag)
    last_value = method(functions.LastValue)
    lead = method(functions.Lead)
    nth_value = method(functions.NthValue)
    ntile = staticmethod(functions.Ntile)
    percent_rank = method(functions.PercentRank)
    rank = method(functions.Rank)
    row_number = method(functions.RowNumber)
    strip = method(functions.Trim)
    lstrip = method(functions.LTrim)
    rstrip = method(functions.RTrim)
    repeat = method(functions.Repeat)
    nullif = method(functions.NullIf)
    __reversed__ = method(functions.Reverse)
    __abs__ = method(functions.Abs)
    __ceil__ = method(functions.Ceil)
    __floor__ = method(functions.Floor)
    __mod__ = method(functions.Mod)
    pi = functions.Pi()
    __pow__ = method(functions.Power)
    __round__ = method(functions.Round)
    sha1 = method(functions.SHA1)
    sha224 = method(functions.SHA224)
    sha256 = method(functions.SHA256)
    sha384 = method(functions.SHA384)
    sha512 = method(functions.SHA512)
    collate = method(functions.Collate)
    json = staticmethod(functions.JSONObject)
    random = staticmethod(functions.Random)
    if gis:  # pragma: no cover
        area = property(gis.functions.Area)
        geojson = method(gis.functions.AsGeoJSON)
        gml = method(gis.functions.AsGML)
        kml = method(gis.functions.AsKML)
        svg = method(gis.functions.AsSVG)
        bounding_circle = method(gis.functions.BoundingCircle)
        centroid = property(gis.functions.Centroid)
        difference = method(gis.functions.Difference)
        envelope = property(gis.functions.Envelope)
        geohash = method(gis.functions.GeoHash)  # __hash__ requires an int
        intersection = method(gis.functions.Intersection)
        make_valid = method(gis.functions.MakeValid)
        mem_size = property(gis.functions.MemSize)
        num_geometries = property(gis.functions.NumGeometries)
        num_points = property(gis.functions.NumPoints)
        perimeter = property(gis.functions.Perimeter)
        point_on_surface = property(gis.functions.PointOnSurface)
        reverse = method(gis.functions.Reverse)
        scale = method(gis.functions.Scale)
        snap_to_grid = method(gis.functions.SnapToGrid)
        symmetric_difference = method(gis.functions.SymDifference)
        transform = method(gis.functions.Transform)
        translate = method(gis.functions.Translate)
        union = method(gis.functions.Union)
        azimuth = method(gis.functions.Azimuth)
        line_locate_point = method(gis.functions.LineLocatePoint)
        force_polygon_cw = method(gis.functions.ForcePolygonCW)

        @method
        class distance(gis.functions.Distance):
            """Return `Distance` with support for lookups: <, <=, >, >=, within."""

            __lt__ = method(transform, "distance_lt")
            __le__ = method(transform, "distance_lte")
            __gt__ = method(transform, "distance_gt")
            __ge__ = method(transform, "distance_gte")
            within = method(transform, "dwithin")

    def __getattr__(self, name: str) -> Self:
        """Return new [F][model_values.F] object with chained attribute."""
        return type(self)(f"{self.name}__{name}")

    def __eq__(self, value, lookup: str = "") -> models.Q:
        """Return `Q` object with lookup."""
        if not lookup and type(value) is models.F:
            return self.name == value.name
        return models.Q(**{self.name + lookup: value})

    def __ne__(self, value) -> models.Q:
        """Allow __ne=None lookup without custom queryset."""
        if value is None:
            return self.__eq__(False, "__isnull")
        return self.__eq__(value, "__ne")

    __hash__ = models.F.__hash__

    def __call__(self, *args, **extra) -> models.Func:
        name, _, func = self.name.rpartition("__")
        return self.lookups[func](name, *args, **extra)

    def __iter__(self):
        raise TypeError("'F' object is not iterable")

    def __getitem__(self, slc: slice) -> models.Func:
        """Return field `Substr` or `Right`."""
        assert (slc.stop or 0) >= 0 and slc.step is None
        start = slc.start or 0
        if start < 0:
            assert slc.stop is None
            return functions.Right(self, -start)
        size = slc.stop and max(slc.stop - start, 0)
        return functions.Substr(self, start + 1, size)

    def __rmod__(self, value):
        return functions.Mod(value, self)

    def __rpow__(self, value):
        return functions.Power(value, self)

    @method
    def count(self="*", **extra):
        """Return `Count` with optional field."""
        return models.Count(getattr(self, "name", self), **extra)

    def find(self, sub, **extra) -> models.Expression:
        """Return `StrIndex` with `str.find` semantics."""
        return functions.StrIndex(self, Value(sub), **extra) - 1

    def replace(self, old, new="", **extra) -> models.Func:
        """Return `Replace` with wrapped values."""
        return functions.Replace(self, Value(old), Value(new), **extra)

    def ljust(self, width: int, fill=" ", **extra) -> models.Func:
        """Return `LPad` with wrapped values."""
        return functions.LPad(self, width, Value(fill), **extra)

    def rjust(self, width: int, fill=" ", **extra) -> models.Func:
        """Return `RPad` with wrapped values."""
        return functions.RPad(self, width, Value(fill), **extra)

    def log(self, base=math.e, **extra) -> models.Func:
        """Return `Log`, by default `Ln`."""
        return functions.Log(self, base, **extra)

distance

Bases: Distance

Return Distance with support for lookups: <, <=, >, >=, within.

Source code in model_values/__init__.py
@method
class distance(gis.functions.Distance):
    """Return `Distance` with support for lookups: <, <=, >, >=, within."""

    __lt__ = method(transform, "distance_lt")
    __le__ = method(transform, "distance_lte")
    __gt__ = method(transform, "distance_gt")
    __ge__ = method(transform, "distance_gte")
    within = method(transform, "dwithin")

__eq__(value, lookup='')

Return Q object with lookup.

Source code in model_values/__init__.py
def __eq__(self, value, lookup: str = "") -> models.Q:
    """Return `Q` object with lookup."""
    if not lookup and type(value) is models.F:
        return self.name == value.name
    return models.Q(**{self.name + lookup: value})

__getattr__(name)

Return new [F][model_values.F] object with chained attribute.

Source code in model_values/__init__.py
def __getattr__(self, name: str) -> Self:
    """Return new [F][model_values.F] object with chained attribute."""
    return type(self)(f"{self.name}__{name}")

__getitem__(slc)

Return field Substr or Right.

Source code in model_values/__init__.py
def __getitem__(self, slc: slice) -> models.Func:
    """Return field `Substr` or `Right`."""
    assert (slc.stop or 0) >= 0 and slc.step is None
    start = slc.start or 0
    if start < 0:
        assert slc.stop is None
        return functions.Right(self, -start)
    size = slc.stop and max(slc.stop - start, 0)
    return functions.Substr(self, start + 1, size)

__ne__(value)

Allow __ne=None lookup without custom queryset.

Source code in model_values/__init__.py
def __ne__(self, value) -> models.Q:
    """Allow __ne=None lookup without custom queryset."""
    if value is None:
        return self.__eq__(False, "__isnull")
    return self.__eq__(value, "__ne")

count(**extra)

Return Count with optional field.

Source code in model_values/__init__.py
@method
def count(self="*", **extra):
    """Return `Count` with optional field."""
    return models.Count(getattr(self, "name", self), **extra)

find(sub, **extra)

Return StrIndex with str.find semantics.

Source code in model_values/__init__.py
def find(self, sub, **extra) -> models.Expression:
    """Return `StrIndex` with `str.find` semantics."""
    return functions.StrIndex(self, Value(sub), **extra) - 1

ljust(width, fill=' ', **extra)

Return LPad with wrapped values.

Source code in model_values/__init__.py
def ljust(self, width: int, fill=" ", **extra) -> models.Func:
    """Return `LPad` with wrapped values."""
    return functions.LPad(self, width, Value(fill), **extra)

log(base=math.e, **extra)

Return Log, by default Ln.

Source code in model_values/__init__.py
def log(self, base=math.e, **extra) -> models.Func:
    """Return `Log`, by default `Ln`."""
    return functions.Log(self, base, **extra)

replace(old, new='', **extra)

Return Replace with wrapped values.

Source code in model_values/__init__.py
def replace(self, old, new="", **extra) -> models.Func:
    """Return `Replace` with wrapped values."""
    return functions.Replace(self, Value(old), Value(new), **extra)

rjust(width, fill=' ', **extra)

Return RPad with wrapped values.

Source code in model_values/__init__.py
def rjust(self, width: int, fill=" ", **extra) -> models.Func:
    """Return `RPad` with wrapped values."""
    return functions.RPad(self, width, Value(fill), **extra)

Note

See source for available aggregate spatial functions if gis is configured.

Bases: QuerySet, Lookup

Source code in model_values/__init__.py
class QuerySet(models.QuerySet, Lookup):
    min = reduce(models.Min)
    max = reduce(models.Max)
    sum = reduce(models.Sum)
    mean = reduce(models.Avg)
    var = reduce(models.Variance)
    std = reduce(models.StdDev)
    __add__ = field(operator.add)
    __sub__ = field(operator.sub)
    __mul__ = field(operator.mul)
    __truediv__ = field(operator.truediv)
    __mod__ = field(operator.mod)
    __pow__ = field(operator.pow)
    if gis:  # pragma: no cover
        collect = reduce(gis.Collect)
        extent = reduce(gis.Extent)
        extent3d = reduce(gis.Extent3D)
        make_line = reduce(gis.MakeLine)
        union = reduce(gis.Union)

    @property
    def _flat(self):
        return issubclass(self._iterable_class, models.query.FlatValuesListIterable)

    @property
    def _named(self):
        return issubclass(self._iterable_class, models.query.NamedValuesListIterable)

    def __getitem__(self, key):
        """Allow column access by field names, expressions, or `F` objects.

        `qs[field]` returns flat `values_list`

        `qs[field, ...]` returns tupled `values_list`

        `qs[Q_obj]` provisionally returns filtered [QuerySet][model_values.QuerySet]
        """
        if isinstance(key, tuple):
            return self.values_list(*map(extract, key), named=True)
        key = extract(key)
        if isinstance(key, (str, models.Expression)):
            return self.values_list(key, flat=True)
        if isinstance(key, models.Q):
            return self.filter(key)
        return super().__getitem__(key)

    def __setitem__(self, key, value):
        """Update a single column."""
        self.update(**{key: value})

    def __eq__(self, value, lookup: str = "") -> Self:
        """Return [QuerySet][model_values.QuerySet] filtered by comparison to given value."""
        (field,) = self._fields  # type: ignore
        return self.filter(**{field + lookup: value})

    def __contains__(self, value):
        """Return whether value is present using `exists`."""
        if self._result_cache is None and self._flat:
            return (self == value).exists()
        return value in iter(self)

    def __iter__(self):
        """Iteration extended to support [group_by][model_values.QuerySet.group_by]."""
        if not hasattr(self, "_group_by"):
            return super().__iter__()
        size = len(self._group_by)
        rows = self[self._group_by + self._fields].order_by(*self._group_by).iterator()  # type: ignore
        groups = itertools.groupby(rows, key=operator.itemgetter(*range(size)))
        getter = operator.itemgetter(size if self._flat else slice(size, None))
        if self._named:
            Row = collections.namedtuple("Row", self._fields)
            getter = lambda tup: Row(*tup[size:])  # noqa: E731
        return ((key, map(getter, values)) for key, values in groups)

    def select(self, *fields, **annotations) -> Self:
        """Return annotated `values_list`."""
        return self.annotate(**annotations)[fields + tuple(annotations)]

    items = select  # deprecated name

    def group_by(self, *fields, **annotations) -> Self:
        """Return a grouped [QuerySet][model_values.QuerySet].

        The queryset is iterable in the same manner as `itertools.groupby`.
        Additionally the [reduce][model_values.QuerySet.reduce] functions will return annotated querysets.
        """
        qs = self.annotate(**annotations)
        qs._group_by = fields + tuple(annotations)  # type: ignore
        return qs

    groupby = group_by  # deprecated name

    def annotate(self, *args, **kwargs) -> Self:
        """Annotate extended to also handle mapping values, as a [Case][model_values.Case] expression.

        Args:
            **kwargs: `field={Q_obj: value, ...}, ...`

        As a provisional feature, an optional `default` key may be specified.
        """
        for field, value in kwargs.items():
            if Case.isa(value):
                kwargs[field] = Case.defaultdict(value)
        return super().annotate(*args, **kwargs)

    def alias(self, *args, **kwargs) -> Self:
        """Alias extended to also handle mapping values, as a [Case][model_values.Case] expression.

        Args:
            **kwargs: `field={Q_obj: value, ...}, ...`
        """
        for field, value in kwargs.items():
            if Case.isa(value):
                kwargs[field] = Case.defaultdict(value)
        return super().alias(*args, **kwargs)

    def value_counts(self, alias: str = "count") -> Self:
        """Return annotated value counts."""
        return self.select(*self._fields, **{alias: F.count()})

    def sort(self, reverse=False) -> Self:
        """Return [QuerySet][model_values.QuerySet] ordered by selected values."""
        qs = self.order_by(*self._fields)
        return qs.reverse() if reverse else qs

    sort_values = sort  # deprecated name

    def reduce(self, *funcs, **extra):
        """Return aggregated values, or an annotated [QuerySet][model_values.QuerySet].

        Args:
            *funcs: aggregation function classes
        """
        funcs = [func(field, **extra) for field, func in zip(self._fields, itertools.cycle(funcs))]
        if hasattr(self, "_group_by"):
            return self[self._group_by].annotate(*funcs)
        names = [func.default_alias for func in funcs]
        row = self.aggregate(*funcs)
        if self._named:
            return collections.namedtuple("Row", names)(**row)
        return row[names[0]] if self._flat else tuple(map(row.__getitem__, names))

    def update(self, **kwargs) -> int:
        """Update extended to also handle mapping values, as a [Case][model_values.Case] expression.

        Args:
            **kwargs: `field={Q_obj: value, ...}, ...`
        """
        for field, value in kwargs.items():
            if Case.isa(value):
                kwargs[field] = Case(value, default=F(field))
        return super().update(**kwargs)

    def change(self, defaults: Mapping = {}, **kwargs) -> int:
        """Update and return number of rows that actually changed.

        For triggering on-change logic without fetching first.

        `if qs.change(status=...):` status actually changed

        `qs.change({'last_modified': now}, status=...)` last_modified only updated if status updated

        Args:
            defaults: optional mapping which will be updated conditionally, as with `update_or_create`.
        """
        return self.exclude(**kwargs).update(**dict(defaults, **kwargs))

    def changed(self, **kwargs) -> dict:
        """Return first mapping of fields and values which differ in the db.

        Also efficient enough to be used in boolean contexts, instead of `exists`.
        """
        row = self.exclude(**kwargs).values(*kwargs).first() or {}
        return {field: value for field, value in row.items() if value != kwargs[field]}

    def exists(self, count: int = 1) -> bool:
        """Return whether there are at least the specified number of rows."""
        if count == 1:
            return super().exists()
        return (self[:count].count() if self._result_cache is None else len(self)) >= count

__contains__(value)

Return whether value is present using exists.

Source code in model_values/__init__.py
def __contains__(self, value):
    """Return whether value is present using `exists`."""
    if self._result_cache is None and self._flat:
        return (self == value).exists()
    return value in iter(self)

__eq__(value, lookup='')

Return [QuerySet][model_values.QuerySet] filtered by comparison to given value.

Source code in model_values/__init__.py
def __eq__(self, value, lookup: str = "") -> Self:
    """Return [QuerySet][model_values.QuerySet] filtered by comparison to given value."""
    (field,) = self._fields  # type: ignore
    return self.filter(**{field + lookup: value})

__getitem__(key)

Allow column access by field names, expressions, or F objects.

qs[field] returns flat values_list

qs[field, ...] returns tupled values_list

qs[Q_obj] provisionally returns filtered [QuerySet][model_values.QuerySet]

Source code in model_values/__init__.py
def __getitem__(self, key):
    """Allow column access by field names, expressions, or `F` objects.

    `qs[field]` returns flat `values_list`

    `qs[field, ...]` returns tupled `values_list`

    `qs[Q_obj]` provisionally returns filtered [QuerySet][model_values.QuerySet]
    """
    if isinstance(key, tuple):
        return self.values_list(*map(extract, key), named=True)
    key = extract(key)
    if isinstance(key, (str, models.Expression)):
        return self.values_list(key, flat=True)
    if isinstance(key, models.Q):
        return self.filter(key)
    return super().__getitem__(key)

__iter__()

Iteration extended to support [group_by][model_values.QuerySet.group_by].

Source code in model_values/__init__.py
def __iter__(self):
    """Iteration extended to support [group_by][model_values.QuerySet.group_by]."""
    if not hasattr(self, "_group_by"):
        return super().__iter__()
    size = len(self._group_by)
    rows = self[self._group_by + self._fields].order_by(*self._group_by).iterator()  # type: ignore
    groups = itertools.groupby(rows, key=operator.itemgetter(*range(size)))
    getter = operator.itemgetter(size if self._flat else slice(size, None))
    if self._named:
        Row = collections.namedtuple("Row", self._fields)
        getter = lambda tup: Row(*tup[size:])  # noqa: E731
    return ((key, map(getter, values)) for key, values in groups)

__setitem__(key, value)

Update a single column.

Source code in model_values/__init__.py
def __setitem__(self, key, value):
    """Update a single column."""
    self.update(**{key: value})

alias(*args, **kwargs)

Alias extended to also handle mapping values, as a [Case][model_values.Case] expression.

Parameters:

Name Type Description Default
**kwargs

field={Q_obj: value, ...}, ...

{}
Source code in model_values/__init__.py
def alias(self, *args, **kwargs) -> Self:
    """Alias extended to also handle mapping values, as a [Case][model_values.Case] expression.

    Args:
        **kwargs: `field={Q_obj: value, ...}, ...`
    """
    for field, value in kwargs.items():
        if Case.isa(value):
            kwargs[field] = Case.defaultdict(value)
    return super().alias(*args, **kwargs)

annotate(*args, **kwargs)

Annotate extended to also handle mapping values, as a [Case][model_values.Case] expression.

Parameters:

Name Type Description Default
**kwargs

field={Q_obj: value, ...}, ...

{}

As a provisional feature, an optional default key may be specified.

Source code in model_values/__init__.py
def annotate(self, *args, **kwargs) -> Self:
    """Annotate extended to also handle mapping values, as a [Case][model_values.Case] expression.

    Args:
        **kwargs: `field={Q_obj: value, ...}, ...`

    As a provisional feature, an optional `default` key may be specified.
    """
    for field, value in kwargs.items():
        if Case.isa(value):
            kwargs[field] = Case.defaultdict(value)
    return super().annotate(*args, **kwargs)

change(defaults={}, **kwargs)

Update and return number of rows that actually changed.

For triggering on-change logic without fetching first.

if qs.change(status=...): status actually changed

qs.change({'last_modified': now}, status=...) last_modified only updated if status updated

Parameters:

Name Type Description Default
defaults Mapping

optional mapping which will be updated conditionally, as with update_or_create.

{}
Source code in model_values/__init__.py
def change(self, defaults: Mapping = {}, **kwargs) -> int:
    """Update and return number of rows that actually changed.

    For triggering on-change logic without fetching first.

    `if qs.change(status=...):` status actually changed

    `qs.change({'last_modified': now}, status=...)` last_modified only updated if status updated

    Args:
        defaults: optional mapping which will be updated conditionally, as with `update_or_create`.
    """
    return self.exclude(**kwargs).update(**dict(defaults, **kwargs))

changed(**kwargs)

Return first mapping of fields and values which differ in the db.

Also efficient enough to be used in boolean contexts, instead of exists.

Source code in model_values/__init__.py
def changed(self, **kwargs) -> dict:
    """Return first mapping of fields and values which differ in the db.

    Also efficient enough to be used in boolean contexts, instead of `exists`.
    """
    row = self.exclude(**kwargs).values(*kwargs).first() or {}
    return {field: value for field, value in row.items() if value != kwargs[field]}

exists(count=1)

Return whether there are at least the specified number of rows.

Source code in model_values/__init__.py
def exists(self, count: int = 1) -> bool:
    """Return whether there are at least the specified number of rows."""
    if count == 1:
        return super().exists()
    return (self[:count].count() if self._result_cache is None else len(self)) >= count

group_by(*fields, **annotations)

Return a grouped [QuerySet][model_values.QuerySet].

The queryset is iterable in the same manner as itertools.groupby. Additionally the [reduce][model_values.QuerySet.reduce] functions will return annotated querysets.

Source code in model_values/__init__.py
def group_by(self, *fields, **annotations) -> Self:
    """Return a grouped [QuerySet][model_values.QuerySet].

    The queryset is iterable in the same manner as `itertools.groupby`.
    Additionally the [reduce][model_values.QuerySet.reduce] functions will return annotated querysets.
    """
    qs = self.annotate(**annotations)
    qs._group_by = fields + tuple(annotations)  # type: ignore
    return qs

reduce(*funcs, **extra)

Return aggregated values, or an annotated [QuerySet][model_values.QuerySet].

Parameters:

Name Type Description Default
*funcs

aggregation function classes

()
Source code in model_values/__init__.py
def reduce(self, *funcs, **extra):
    """Return aggregated values, or an annotated [QuerySet][model_values.QuerySet].

    Args:
        *funcs: aggregation function classes
    """
    funcs = [func(field, **extra) for field, func in zip(self._fields, itertools.cycle(funcs))]
    if hasattr(self, "_group_by"):
        return self[self._group_by].annotate(*funcs)
    names = [func.default_alias for func in funcs]
    row = self.aggregate(*funcs)
    if self._named:
        return collections.namedtuple("Row", names)(**row)
    return row[names[0]] if self._flat else tuple(map(row.__getitem__, names))

select(*fields, **annotations)

Return annotated values_list.

Source code in model_values/__init__.py
def select(self, *fields, **annotations) -> Self:
    """Return annotated `values_list`."""
    return self.annotate(**annotations)[fields + tuple(annotations)]

sort(reverse=False)

Return [QuerySet][model_values.QuerySet] ordered by selected values.

Source code in model_values/__init__.py
def sort(self, reverse=False) -> Self:
    """Return [QuerySet][model_values.QuerySet] ordered by selected values."""
    qs = self.order_by(*self._fields)
    return qs.reverse() if reverse else qs

update(**kwargs)

Update extended to also handle mapping values, as a [Case][model_values.Case] expression.

Parameters:

Name Type Description Default
**kwargs

field={Q_obj: value, ...}, ...

{}
Source code in model_values/__init__.py
def update(self, **kwargs) -> int:
    """Update extended to also handle mapping values, as a [Case][model_values.Case] expression.

    Args:
        **kwargs: `field={Q_obj: value, ...}, ...`
    """
    for field, value in kwargs.items():
        if Case.isa(value):
            kwargs[field] = Case(value, default=F(field))
    return super().update(**kwargs)

value_counts(alias='count')

Return annotated value counts.

Source code in model_values/__init__.py
def value_counts(self, alias: str = "count") -> Self:
    """Return annotated value counts."""
    return self.select(*self._fields, **{alias: F.count()})

Bases: Manager

Source code in model_values/__init__.py
class Manager(models.Manager):
    def get_queryset(self):
        return QuerySet(self.model, Query(self.model), self._db, self._hints)

    def __getitem__(self, pk) -> QuerySet:
        """Return [QuerySet][model_values.QuerySet] which matches primary key.

        To encourage direct db access, instead of always using get and save.
        """
        return self.filter(pk=pk)

    def __delitem__(self, pk):
        """Delete row with primary key."""
        self[pk].delete()

    def __contains__(self, pk):
        """Return whether primary key is present using `exists`."""
        return self[pk].exists()

    def upsert(self, defaults: Mapping = {}, **kwargs) -> int | models.Model:
        """Update or insert returning number of rows or created object.

        Faster and safer than `update_or_create`.
        Supports combined expression updates by assuming the identity element on insert:  `F(...) + 1`.

        Args:
            defaults: optional mapping which will be updated, as with `update_or_create`.
        """
        update = getattr(self.filter(**kwargs), "update" if defaults else "count")
        for field, value in defaults.items():
            expr = isinstance(value, models.expressions.CombinedExpression)
            kwargs[field] = value.rhs.value if expr else value
        try:
            with transaction.atomic():
                return update(**defaults) or self.create(**kwargs)
        except IntegrityError:
            return update(**defaults)

    def bulk_changed(self, field, data: Mapping, key: str = "pk") -> dict:
        """Return mapping of values which differ in the db.

        Args:
            field: value column
            data: `{pk: value, ...}`
            key: unique key column
        """
        rows = self.filter(F(key).isin(data))[key, field].iterator()
        return {pk: value for pk, value in rows if value != data[pk]}

    def bulk_change(
        self, field, data: Mapping, key: str = "pk", conditional=False, **kwargs
    ) -> int:
        """Update changed rows with a minimal number of queries, by inverting the data to use `pk__in`.

        Args:
            field: value column
            data: `{pk: value, ...}`
            key: unique key column
            conditional: execute select query and single conditional update;
                may be more efficient if the percentage of changed rows is relatively small
            **kwargs: additional fields to be updated
        """
        if conditional:
            data = {pk: data[pk] for pk in self.bulk_changed(field, data, key)}
        updates = collections.defaultdict(list)
        for pk in data:
            updates[data[pk]].append(pk)
        if conditional:
            kwargs[field] = {F(key).isin(tuple(updates[value])): value for value in updates}
            return self.filter(F(key).isin(data)).update(**kwargs)
        count = 0
        for value in updates:
            kwargs[field] = value
            count += self.filter((F(field) != value) & F(key).isin(updates[value])).update(**kwargs)
        return count

__contains__(pk)

Return whether primary key is present using exists.

Source code in model_values/__init__.py
def __contains__(self, pk):
    """Return whether primary key is present using `exists`."""
    return self[pk].exists()

__delitem__(pk)

Delete row with primary key.

Source code in model_values/__init__.py
def __delitem__(self, pk):
    """Delete row with primary key."""
    self[pk].delete()

__getitem__(pk)

Return [QuerySet][model_values.QuerySet] which matches primary key.

To encourage direct db access, instead of always using get and save.

Source code in model_values/__init__.py
def __getitem__(self, pk) -> QuerySet:
    """Return [QuerySet][model_values.QuerySet] which matches primary key.

    To encourage direct db access, instead of always using get and save.
    """
    return self.filter(pk=pk)

bulk_change(field, data, key='pk', conditional=False, **kwargs)

Update changed rows with a minimal number of queries, by inverting the data to use pk__in.

Parameters:

Name Type Description Default
field

value column

required
data Mapping

{pk: value, ...}

required
key str

unique key column

'pk'
conditional

execute select query and single conditional update; may be more efficient if the percentage of changed rows is relatively small

False
**kwargs

additional fields to be updated

{}
Source code in model_values/__init__.py
def bulk_change(
    self, field, data: Mapping, key: str = "pk", conditional=False, **kwargs
) -> int:
    """Update changed rows with a minimal number of queries, by inverting the data to use `pk__in`.

    Args:
        field: value column
        data: `{pk: value, ...}`
        key: unique key column
        conditional: execute select query and single conditional update;
            may be more efficient if the percentage of changed rows is relatively small
        **kwargs: additional fields to be updated
    """
    if conditional:
        data = {pk: data[pk] for pk in self.bulk_changed(field, data, key)}
    updates = collections.defaultdict(list)
    for pk in data:
        updates[data[pk]].append(pk)
    if conditional:
        kwargs[field] = {F(key).isin(tuple(updates[value])): value for value in updates}
        return self.filter(F(key).isin(data)).update(**kwargs)
    count = 0
    for value in updates:
        kwargs[field] = value
        count += self.filter((F(field) != value) & F(key).isin(updates[value])).update(**kwargs)
    return count

bulk_changed(field, data, key='pk')

Return mapping of values which differ in the db.

Parameters:

Name Type Description Default
field

value column

required
data Mapping

{pk: value, ...}

required
key str

unique key column

'pk'
Source code in model_values/__init__.py
def bulk_changed(self, field, data: Mapping, key: str = "pk") -> dict:
    """Return mapping of values which differ in the db.

    Args:
        field: value column
        data: `{pk: value, ...}`
        key: unique key column
    """
    rows = self.filter(F(key).isin(data))[key, field].iterator()
    return {pk: value for pk, value in rows if value != data[pk]}

upsert(defaults={}, **kwargs)

Update or insert returning number of rows or created object.

Faster and safer than update_or_create. Supports combined expression updates by assuming the identity element on insert: F(...) + 1.

Parameters:

Name Type Description Default
defaults Mapping

optional mapping which will be updated, as with update_or_create.

{}
Source code in model_values/__init__.py
def upsert(self, defaults: Mapping = {}, **kwargs) -> int | models.Model:
    """Update or insert returning number of rows or created object.

    Faster and safer than `update_or_create`.
    Supports combined expression updates by assuming the identity element on insert:  `F(...) + 1`.

    Args:
        defaults: optional mapping which will be updated, as with `update_or_create`.
    """
    update = getattr(self.filter(**kwargs), "update" if defaults else "count")
    for field, value in defaults.items():
        expr = isinstance(value, models.expressions.CombinedExpression)
        kwargs[field] = value.rhs.value if expr else value
    try:
        with transaction.atomic():
            return update(**defaults) or self.create(**kwargs)
    except IntegrityError:
        return update(**defaults)

Bases: Case

Case expression from mapping of when conditionals.

Parameters:

Name Type Description Default
conds

{Q_obj: value, ...}

required
default

optional default value or F object

required
Source code in model_values/__init__.py
class Case(models.Case):
    """`Case` expression from mapping of when conditionals.

    Args:
        conds: `{Q_obj: value, ...}`
        default: optional default value or `F` object
    """

    types = {
        str: models.CharField,
        int: models.IntegerField,
        float: models.FloatField,
        bool: models.BooleanField,
    }

    def __new__(cls, conds: Mapping, default=None, **extra):
        cases = (models.When(cond, Value(conds[cond])) for cond in conds)
        return models.Case(*cases, default=Value(default), **extra)

    @classmethod
    def defaultdict(cls, conds):
        conds = dict(conds)
        return cls(conds, default=conds.pop("default", None))

    @classmethod
    def isa(cls, value):
        return isinstance(value, Mapping) and any(isinstance(key, models.Q) for key in value)

Bases: property

A property bound to a class.

Source code in model_values/__init__.py
class classproperty(property):
    """A property bound to a class."""

    def __get__(self, instance, owner):
        return self.fget(owner)  # type: ignore

Return a CharField or IntegerField with choices from given enum.

By default, enum names and values are used as db values and display labels respectively, returning a CharField with computed max_length.

Parameters:

Name Type Description Default
display Callable | None

optional callable to transform enum names to display labels, thereby using enum values as db values and also supporting integers.

None
Source code in model_values/__init__.py
def EnumField(enum, display: Callable | None = None, **options) -> models.Field:
    """Return a `CharField` or `IntegerField` with choices from given enum.

    By default, enum names and values are used as db values and display labels respectively,
    returning a `CharField` with computed `max_length`.

    Args:
        display: optional callable to transform enum names to display labels,
            thereby using enum values as db values and also supporting integers.
    """
    choices = tuple((choice.name, choice.value) for choice in enum)
    if display is not None:
        choices = tuple((choice.value, display(choice.name)) for choice in enum)
    try:
        max_length = max(map(len, dict(choices)))
    except TypeError:
        return models.IntegerField(choices=choices, **options)
    return models.CharField(max_length=max_length, choices=choices, **options)