Skip to content

Core Reference

graphique.core.Parquet

Bases: Dataset

Partitioned parquet dataset.

Source code in graphique/core.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class Parquet(ds.Dataset):
    """Partitioned parquet dataset."""

    def schema(self) -> pa.Schema:
        """partition schema"""
        return self.partitioning.schema if hasattr(self, 'partitioning') else pa.schema([])

    def keys(self, *names) -> list:
        """Return prefix of matching partition keys."""
        keys = set(Parquet.schema(self).names)
        return list(itertools.takewhile(lambda name: name.lstrip('-') in keys, names))

    def fragments(self, counts: str = '') -> ibis.Table:
        """Return partition fragments as a table."""
        parts = []
        for frag in self._get_fragments(self._scan_options.get('filter')):
            parts.append(ds.get_partition_keys(frag.partition_expression))
            parts[-1]['__path__'] = frag.path
            if counts:
                parts[-1][counts] = frag.count_rows()
        return ibis.memtable(pa.Table.from_pylist(parts))

    def group(self, *names, counts: str = '') -> ibis.Table:
        """Return grouped partitions as a table."""
        table = Parquet.fragments(self, counts)
        agg = {counts: table[counts].sum()} if counts else {}
        return table.aggregate(agg, by=names).order_by(*names)

    def filter(self, expr: ds.Expression) -> ds.Dataset | None:
        """Attempt to apply filter to partition keys."""
        try:  # raises ValueError if filter references non-partition keys
            ds.dataset([], schema=self.partitioning.schema).scanner(filter=expr)
        except (AttributeError, ValueError):
            return None
        return self if expr is None else self.filter(expr)

    def to_table(self) -> ibis.Table:
        """Return ibis `Table` from filtered dataset."""
        paths = [frag.path for frag in self._get_fragments(self._scan_options.get('filter'))]
        hive = isinstance(self.partitioning, ds.HivePartitioning)
        return ibis.read_parquet(paths, hive_partitioning=hive)

    def rank(self, limit: int, *names: str, dense: bool = False) -> ibis.Table:
        """Return ordered limited partitions as a table."""
        keys = {name.strip('-'): order_key(name) for name in names}
        table = Parquet.fragments(self, counts='_').order_by(*keys.values()).cache()
        groups = table.aggregate(count=ibis._.count(), total=table['_'].sum(), by=list(keys))
        groups = groups.order_by(*keys.values()).cache()
        if not dense:
            totals = itertools.accumulate(groups['total'].to_list())
            limit = next((index for index, total in enumerate(totals, 1) if total >= limit), None)  # type: ignore
        limit = groups[:limit]['count'].sum().to_pyarrow().as_py()
        hive = isinstance(self.partitioning, ds.HivePartitioning)
        return ibis.read_parquet(table[:limit]['__path__'].to_list(), hive_partitioning=hive)

filter(expr)

Attempt to apply filter to partition keys.

Source code in graphique/core.py
58
59
60
61
62
63
64
def filter(self, expr: ds.Expression) -> ds.Dataset | None:
    """Attempt to apply filter to partition keys."""
    try:  # raises ValueError if filter references non-partition keys
        ds.dataset([], schema=self.partitioning.schema).scanner(filter=expr)
    except (AttributeError, ValueError):
        return None
    return self if expr is None else self.filter(expr)

fragments(counts='')

Return partition fragments as a table.

Source code in graphique/core.py
42
43
44
45
46
47
48
49
50
def fragments(self, counts: str = '') -> ibis.Table:
    """Return partition fragments as a table."""
    parts = []
    for frag in self._get_fragments(self._scan_options.get('filter')):
        parts.append(ds.get_partition_keys(frag.partition_expression))
        parts[-1]['__path__'] = frag.path
        if counts:
            parts[-1][counts] = frag.count_rows()
    return ibis.memtable(pa.Table.from_pylist(parts))

group(*names, counts='')

Return grouped partitions as a table.

Source code in graphique/core.py
52
53
54
55
56
def group(self, *names, counts: str = '') -> ibis.Table:
    """Return grouped partitions as a table."""
    table = Parquet.fragments(self, counts)
    agg = {counts: table[counts].sum()} if counts else {}
    return table.aggregate(agg, by=names).order_by(*names)

keys(*names)

Return prefix of matching partition keys.

Source code in graphique/core.py
37
38
39
40
def keys(self, *names) -> list:
    """Return prefix of matching partition keys."""
    keys = set(Parquet.schema(self).names)
    return list(itertools.takewhile(lambda name: name.lstrip('-') in keys, names))

rank(limit, *names, dense=False)

Return ordered limited partitions as a table.

Source code in graphique/core.py
72
73
74
75
76
77
78
79
80
81
82
83
def rank(self, limit: int, *names: str, dense: bool = False) -> ibis.Table:
    """Return ordered limited partitions as a table."""
    keys = {name.strip('-'): order_key(name) for name in names}
    table = Parquet.fragments(self, counts='_').order_by(*keys.values()).cache()
    groups = table.aggregate(count=ibis._.count(), total=table['_'].sum(), by=list(keys))
    groups = groups.order_by(*keys.values()).cache()
    if not dense:
        totals = itertools.accumulate(groups['total'].to_list())
        limit = next((index for index, total in enumerate(totals, 1) if total >= limit), None)  # type: ignore
    limit = groups[:limit]['count'].sum().to_pyarrow().as_py()
    hive = isinstance(self.partitioning, ds.HivePartitioning)
    return ibis.read_parquet(table[:limit]['__path__'].to_list(), hive_partitioning=hive)

schema()

partition schema

Source code in graphique/core.py
33
34
35
def schema(self) -> pa.Schema:
    """partition schema"""
    return self.partitioning.schema if hasattr(self, 'partitioning') else pa.schema([])

to_table()

Return ibis Table from filtered dataset.

Source code in graphique/core.py
66
67
68
69
70
def to_table(self) -> ibis.Table:
    """Return ibis `Table` from filtered dataset."""
    paths = [frag.path for frag in self._get_fragments(self._scan_options.get('filter'))]
    hive = isinstance(self.partitioning, ds.HivePartitioning)
    return ibis.read_parquet(paths, hive_partitioning=hive)

graphique.interface.Dataset

Source code in graphique/interface.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
@strawberry.interface(description="ibis `Table` or arrow `Dataset`")
class Dataset:
    def __init__(self, source: Source):
        self.source = source

    @property
    def table(self) -> ibis.Table:
        """source as ibis table"""
        return self.source if isinstance(self.source, ibis.Table) else Parquet.to_table(self.source)

    def resolve(self, info: Info, source: ibis.Table) -> Self:
        """Cache the table if it will be reused."""
        counts = selections(*info.selected_fields)
        counts['type'] = counts['schema'] = 0
        if counts.total() > 1 and isinstance(source, ibis.Table):
            if names := self.select(info, source):
                source = source.select(*names).cache()
        return type(self)(source)

    @classmethod
    @no_type_check
    def resolve_reference(cls, info: Info, **keys) -> Self:
        """Return table filtered by federated keys."""
        self = getattr(info.root_value, cls.field)
        queries = {name: Filter(eq=[keys[name]]) for name in keys}
        return self.filter(info, **queries)

    @staticmethod
    def select(info: Info, source: Source) -> list:
        """Return minimal schema needed to continue."""
        refs: set = set()
        for field in info.selected_fields:
            for selection in field.selections:
                refs.update(references(selection))
        return [name for name in ibis_schema(source) if name in refs]

    def columns(self, info: Info) -> dict:
        """Fields for each column."""
        names = selections(*info.selected_fields)
        table = self.table.select(*names).cache()
        return {name: Column.cast(table[name]) for name in table.columns}

    def row(self, info: Info, index: int = 0) -> dict:
        """Scalar values at index."""
        names = selections(*info.selected_fields)
        table = self.table.select(*names)[index:][:1].cache()
        row = {}
        for name in table.columns:
            if isinstance(table[name], ibis.expr.types.ArrayColumn):
                row[name] = Column.cast(table[name].first().unnest())
            else:
                (row[name],) = table[name].to_list()
        return row

    def filter(self, info: Info, where: Expression | None = None, **queries: Filter) -> Self:
        """[Filter](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.filter) rows by predicates.

        Schema derived fields provide syntax for simple queries; `where` supports complex queries.
        """
        exprs: list = [] if where is None else list(where)  # type: ignore
        source = Parquet.filter(self.source, Filter.to_arrow(**queries))
        if source is None:
            exprs += Filter.to_exprs(**queries)
            source = self.table
        elif exprs:
            source = Parquet.to_table(source)
        return self.resolve(info, source.filter(*exprs) if exprs else source)

    @strawberry.field(
        description=f"[ibis table]({links.ref}/expression-table) or [arrow dataset](https://arrow.apache.org/docs/python/api/dataset.html)"
    )
    def type(self) -> str:
        return type(self.source).__name__

    @strawberry.field(description=links.schema)
    def schema(self) -> Schema:
        schema = ibis_schema(self.source)
        partitioning = Parquet.schema(self.source).names
        return Schema(names=schema.names, types=schema.types, partitioning=partitioning)  # type: ignore

    @doc_field(
        schema="field names and types",
        try_="return null if cast fails",
    )
    def cast(self, info: Info, schema: list[Field], try_: bool = False) -> Self:
        """[Cast](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.cast) the columns of a table."""
        cast = self.table.try_cast if try_ else self.table.cast
        return self.resolve(info, cast({field.name: field.type for field in schema}))

    @doc_field
    def optional(self, info: Info) -> Self | None:
        """Nullable field to stop error propagation, enabling partial query results.

        Will be replaced by client controlled nullability.
        """
        return self.resolve(info, self.source)

    @strawberry.field(
        description=f"[Count]({links.ref}/expression-tables#ibis.expr.types.relations.Table.count) the number of rows."
    )
    def count(self) -> BigInt:
        if isinstance(self.source, ibis.Table):
            return self.source.count().to_pyarrow().as_py()
        return self.source.count_rows()

    @doc_field
    def any(self, limit: BigInt = 1) -> bool:
        """Whether there are at least `limit` rows.

        May be significantly faster than `count` for out-of-core data.
        """
        return self.table[:limit].count().to_pyarrow().as_py() >= limit

    @doc_field(
        name="column name(s); multiple names access nested struct fields",
        cast=f"cast expression to indicated {links.types}",
        try_="return null if cast fails",
    )
    def column(self, name: list[str], cast: str = '', try_: bool = False) -> Column | None:
        """Column of any type by name.

        If the column is in the schema, `columns` can be used instead.
        """
        column = getitems(self.table, *name)
        if cast:
            column = (column.try_cast if try_ else column.cast)(cast)
        return Column.cast(column.as_table().cache()[0])

    @doc_field(
        offset="number of rows to skip; negative value skips from the end",
        limit="maximum number of rows to return",
    )
    def slice(self, info: Info, offset: BigInt = 0, limit: BigInt | None = None) -> Self:
        """[Limit](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.limit) row selection."""
        return self.resolve(info, self.table[offset:][:limit])

    @doc_field(
        on="column names to deduplicate on; defaults to all",
        keep="which duplicates to keep",
        counts=f"[value counts]({links.ref}/expression-tables#ibis.expr.types.relations.Table.value_counts); incompatible with `keep: null`",
        order="optionally include and order by first row number; incompatible with `on: null`",
    )
    def distinct(
        self,
        info: Info,
        on: list[str] | None = None,
        keep: str | None = 'first',
        counts: str = '',
        order: str = '',
    ) -> Self:
        """[Remove duplicate](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.distinct) rows from table.

        Differs from `group` by keeping all columns, and defaulting to all keys.
        """
        table = self.table
        if order:
            table = table.mutate({order: ibis.row_number()})
        if not counts or keep is None:
            table = table.distinct(on=on, keep=keep)
        elif on is None:
            table = table.value_counts(name=counts)
        else:
            keys, func = set(on), operator.methodcaller(keep)
            aggs = {name: func(table[name]) for name in table.columns if name not in keys}
            aggs[counts] = ibis._.count()
            table = table.aggregate(aggs, by=on)
        return self.resolve(info, table.order_by(order) if order else table)

    @doc_field(
        by="column names; empty will aggregate into a single row table",
        counts="optionally include counts in an aliased column",
        order="optionally include and order by first row number",
        aggregate="aggregation functions applied to other columns",
    )
    def group(
        self,
        info: Info,
        by: list[str] = [],
        counts: str = '',
        order: str = '',
        aggregate: Aggregates = {},  # type: ignore
    ) -> Self:
        """[Group](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.group_by) table by columns."""
        aggs = dict(aggregate)  # type: ignore
        if not aggs and not order and by == Parquet.keys(self.source, *by):
            return self.resolve(info, Parquet.group(self.source, *by, counts=counts))
        table = self.table
        if counts:
            aggs[counts] = ibis._.count()
        if order:
            table = table.mutate({order: ibis.row_number()})
            aggs[order] = table[order].first()
        table = table.aggregate(aggs, by=by)
        return self.resolve(info, table.order_by(order) if order else table)

    @doc_field(
        by="column names; prefix with `-` for descending order",
        limit="maximum number of rows to return; optimized for partitioned dataset keys",
        dense="use dense rank with `limit`",
    )
    def order(
        self, info: Info, by: list[str], limit: BigInt | None = None, dense: bool = False
    ) -> Self:
        """[Sort](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.order_by) table by columns."""
        keys = Parquet.keys(self.source, *by)
        if keys and limit is not None:
            table = Parquet.rank(self.source, limit, *keys, dense=dense)
            if keys == by:
                return self.resolve(info, table if dense else table[:limit])
        else:
            table = self.table
        if dense and limit is not None:
            groups = table.aggregate(_=ibis._.count(), by=[name.lstrip('-') for name in by])
            limit = groups.order_by(*map(order_key, by))[:limit]['_'].sum().to_pyarrow().as_py()
        return self.resolve(info, table.order_by(*map(order_key, by))[:limit])

    @doc_field(
        name="column name",
        offset="optionally include index column",
        keep_empty="keep empty array values as null",
        row_number="optionally include first row number in an aliased column",
    )
    def unnest(
        self,
        info: Info,
        name: str,
        offset: str = '',
        keep_empty: bool = False,
        row_number: str = '',
    ) -> Self:
        """[Unnest](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.unnest) an array column from a table."""
        table = self.table
        if row_number:
            table = table.mutate({row_number: ibis.row_number()})
        return self.resolve(info, table.unnest(name, offset=offset or None, keep_empty=keep_empty))

    @doc_field(
        right="name of right table; must be on root Query type",
        keys="column names used as keys on the left side",
        rkeys="column names used as keys on the right side; defaults to left side",
        how="the kind of join: 'inner', 'left', 'right', ...",
        lname="format string to use to rename overlapping columns in the left table",
        rname="format string to use to rename overlapping columns in the right table",
    )
    def join(
        self,
        info: Info,
        right: str,
        keys: list[str],
        rkeys: list[str] = [],
        how: str = 'inner',
        lname: str = '',
        rname: str = '{name}_right',
    ) -> Self:
        """[Join](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.join) two tables."""
        left = self.table
        right = getattr(info.root_value, right).table
        if rkeys:
            keys = [getattr(left, key) == getattr(right, rkey) for key, rkey in zip(keys, rkeys)]
        return self.resolve(
            info, left.join(right, predicates=keys, how=how, lname=lname, rname=rname)
        )

    @doc_field
    def take(self, info: Info, indices: list[BigInt]) -> Self:
        """[Take](https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.take) rows by index."""
        names = self.select(info, self.source)
        if isinstance(self.source, ds.Dataset):
            table = self.source.take(indices, columns=names)
        else:
            batches = self.source.select(*names).to_pyarrow_batches()
            table = ds.Scanner.from_batches(batches).take(indices)
        return type(self)(ibis.memtable(table))

    @doc_field(subset="columns names; defaults to all", how="remove if `any` or `all` are null")
    def drop_null(self, info: Info, subset: list[str] | None = None, how: str = 'any') -> Self:
        """[Drop](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.drop_null) rows with null values."""
        return self.resolve(info, self.table.drop_null(subset, how=how))

    @doc_field(name="column name(s); defaults to all", value="JSON scalar", scalar="typed scalar")
    def fill_null(
        self,
        info: Info,
        name: list[str] | None = None,
        value: JSON | None = UNSET,
        scalar: Scalars = {},  # type: ignore
    ) -> Self:
        """[Fill null](https://ibis-project.org/reference/expression-tables.html#ibis.expr.types.relations.Table.fill_null) values."""
        (value,) = itertools.chain(scalar, [] if value is UNSET else [value])  # type: ignore
        replacements = dict.fromkeys(ibis_schema(self.source) if name is None else name, value)
        return self.resolve(info, self.table.fill_null(replacements))

    @doc_field
    def project(self, info: Info, columns: list[Projection]) -> Self:
        """[Mutate](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.mutate) columns by expressions.

        Renamed to not be confused with a mutation.
        """
        projection = dict(map(Projection.to_ibis, columns))
        return self.resolve(info, self.table.mutate(projection))

    @doc_field(
        by="column names to compare by equality",
        split="boolean column expressions to split on true values",
        counts="optionally include counts in an aliased column",
        alias="format string to name index columns",
        aggregate="aggregation functions applied to other columns",
    )
    def runs(
        self,
        info: Info,
        by: list[str] = [],
        split: list[Projection] = [],
        counts: str = '',
        alias: str = '{}_index',
        aggregate: Aggregates = {},  # type: ignore
    ) -> Self:
        """Provisionally group table by adjacent values in columns."""
        table = self.table
        projection = dict(map(Projection.to_ibis, split))
        for name in by:
            column = table[name] != table[name].lag()
            projection[alias.format(name)] = column.fill_null(False)
        aggs = {name: ibis._[name].first() for name in by}
        aggs.update(aggregate)  # type: ignore
        if counts:
            aggs[counts] = ibis._.count()
        table = table.mutate(projection)  # window functions can't be nested
        table = table.mutate({name: table[name].cumsum() for name in projection})
        return self.resolve(info, table.aggregate(aggs, by=list(projection)).order_by(*projection))

    runs.directives = [provisional()]

table property

source as ibis table

any(limit=1)

Whether there are at least limit rows.

May be significantly faster than count for out-of-core data.

Source code in graphique/interface.py
158
159
160
161
162
163
164
@doc_field
def any(self, limit: BigInt = 1) -> bool:
    """Whether there are at least `limit` rows.

    May be significantly faster than `count` for out-of-core data.
    """
    return self.table[:limit].count().to_pyarrow().as_py() >= limit

cast(info, schema, try_=False)

Cast the columns of a table.

Source code in graphique/interface.py
133
134
135
136
137
138
139
140
@doc_field(
    schema="field names and types",
    try_="return null if cast fails",
)
def cast(self, info: Info, schema: list[Field], try_: bool = False) -> Self:
    """[Cast](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.cast) the columns of a table."""
    cast = self.table.try_cast if try_ else self.table.cast
    return self.resolve(info, cast({field.name: field.type for field in schema}))

column(name, cast='', try_=False)

Column of any type by name.

If the column is in the schema, columns can be used instead.

Source code in graphique/interface.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
@doc_field(
    name="column name(s); multiple names access nested struct fields",
    cast=f"cast expression to indicated {links.types}",
    try_="return null if cast fails",
)
def column(self, name: list[str], cast: str = '', try_: bool = False) -> Column | None:
    """Column of any type by name.

    If the column is in the schema, `columns` can be used instead.
    """
    column = getitems(self.table, *name)
    if cast:
        column = (column.try_cast if try_ else column.cast)(cast)
    return Column.cast(column.as_table().cache()[0])

columns(info)

Fields for each column.

Source code in graphique/interface.py
89
90
91
92
93
def columns(self, info: Info) -> dict:
    """Fields for each column."""
    names = selections(*info.selected_fields)
    table = self.table.select(*names).cache()
    return {name: Column.cast(table[name]) for name in table.columns}

distinct(info, on=None, keep='first', counts='', order='')

Remove duplicate rows from table.

Differs from group by keeping all columns, and defaulting to all keys.

Source code in graphique/interface.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@doc_field(
    on="column names to deduplicate on; defaults to all",
    keep="which duplicates to keep",
    counts=f"[value counts]({links.ref}/expression-tables#ibis.expr.types.relations.Table.value_counts); incompatible with `keep: null`",
    order="optionally include and order by first row number; incompatible with `on: null`",
)
def distinct(
    self,
    info: Info,
    on: list[str] | None = None,
    keep: str | None = 'first',
    counts: str = '',
    order: str = '',
) -> Self:
    """[Remove duplicate](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.distinct) rows from table.

    Differs from `group` by keeping all columns, and defaulting to all keys.
    """
    table = self.table
    if order:
        table = table.mutate({order: ibis.row_number()})
    if not counts or keep is None:
        table = table.distinct(on=on, keep=keep)
    elif on is None:
        table = table.value_counts(name=counts)
    else:
        keys, func = set(on), operator.methodcaller(keep)
        aggs = {name: func(table[name]) for name in table.columns if name not in keys}
        aggs[counts] = ibis._.count()
        table = table.aggregate(aggs, by=on)
    return self.resolve(info, table.order_by(order) if order else table)

drop_null(info, subset=None, how='any')

Drop rows with null values.

Source code in graphique/interface.py
327
328
329
330
@doc_field(subset="columns names; defaults to all", how="remove if `any` or `all` are null")
def drop_null(self, info: Info, subset: list[str] | None = None, how: str = 'any') -> Self:
    """[Drop](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.drop_null) rows with null values."""
    return self.resolve(info, self.table.drop_null(subset, how=how))

fill_null(info, name=None, value=UNSET, scalar={})

Fill null values.

Source code in graphique/interface.py
332
333
334
335
336
337
338
339
340
341
342
343
@doc_field(name="column name(s); defaults to all", value="JSON scalar", scalar="typed scalar")
def fill_null(
    self,
    info: Info,
    name: list[str] | None = None,
    value: JSON | None = UNSET,
    scalar: Scalars = {},  # type: ignore
) -> Self:
    """[Fill null](https://ibis-project.org/reference/expression-tables.html#ibis.expr.types.relations.Table.fill_null) values."""
    (value,) = itertools.chain(scalar, [] if value is UNSET else [value])  # type: ignore
    replacements = dict.fromkeys(ibis_schema(self.source) if name is None else name, value)
    return self.resolve(info, self.table.fill_null(replacements))

filter(info, where=None, **queries)

Filter rows by predicates.

Schema derived fields provide syntax for simple queries; where supports complex queries.

Source code in graphique/interface.py
107
108
109
110
111
112
113
114
115
116
117
118
119
def filter(self, info: Info, where: Expression | None = None, **queries: Filter) -> Self:
    """[Filter](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.filter) rows by predicates.

    Schema derived fields provide syntax for simple queries; `where` supports complex queries.
    """
    exprs: list = [] if where is None else list(where)  # type: ignore
    source = Parquet.filter(self.source, Filter.to_arrow(**queries))
    if source is None:
        exprs += Filter.to_exprs(**queries)
        source = self.table
    elif exprs:
        source = Parquet.to_table(source)
    return self.resolve(info, source.filter(*exprs) if exprs else source)

group(info, by=[], counts='', order='', aggregate={})

Group table by columns.

Source code in graphique/interface.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
@doc_field(
    by="column names; empty will aggregate into a single row table",
    counts="optionally include counts in an aliased column",
    order="optionally include and order by first row number",
    aggregate="aggregation functions applied to other columns",
)
def group(
    self,
    info: Info,
    by: list[str] = [],
    counts: str = '',
    order: str = '',
    aggregate: Aggregates = {},  # type: ignore
) -> Self:
    """[Group](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.group_by) table by columns."""
    aggs = dict(aggregate)  # type: ignore
    if not aggs and not order and by == Parquet.keys(self.source, *by):
        return self.resolve(info, Parquet.group(self.source, *by, counts=counts))
    table = self.table
    if counts:
        aggs[counts] = ibis._.count()
    if order:
        table = table.mutate({order: ibis.row_number()})
        aggs[order] = table[order].first()
    table = table.aggregate(aggs, by=by)
    return self.resolve(info, table.order_by(order) if order else table)

join(info, right, keys, rkeys=[], how='inner', lname='', rname='{name}_right')

Join two tables.

Source code in graphique/interface.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
@doc_field(
    right="name of right table; must be on root Query type",
    keys="column names used as keys on the left side",
    rkeys="column names used as keys on the right side; defaults to left side",
    how="the kind of join: 'inner', 'left', 'right', ...",
    lname="format string to use to rename overlapping columns in the left table",
    rname="format string to use to rename overlapping columns in the right table",
)
def join(
    self,
    info: Info,
    right: str,
    keys: list[str],
    rkeys: list[str] = [],
    how: str = 'inner',
    lname: str = '',
    rname: str = '{name}_right',
) -> Self:
    """[Join](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.join) two tables."""
    left = self.table
    right = getattr(info.root_value, right).table
    if rkeys:
        keys = [getattr(left, key) == getattr(right, rkey) for key, rkey in zip(keys, rkeys)]
    return self.resolve(
        info, left.join(right, predicates=keys, how=how, lname=lname, rname=rname)
    )

optional(info)

Nullable field to stop error propagation, enabling partial query results.

Will be replaced by client controlled nullability.

Source code in graphique/interface.py
142
143
144
145
146
147
148
@doc_field
def optional(self, info: Info) -> Self | None:
    """Nullable field to stop error propagation, enabling partial query results.

    Will be replaced by client controlled nullability.
    """
    return self.resolve(info, self.source)

order(info, by, limit=None, dense=False)

Sort table by columns.

Source code in graphique/interface.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
@doc_field(
    by="column names; prefix with `-` for descending order",
    limit="maximum number of rows to return; optimized for partitioned dataset keys",
    dense="use dense rank with `limit`",
)
def order(
    self, info: Info, by: list[str], limit: BigInt | None = None, dense: bool = False
) -> Self:
    """[Sort](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.order_by) table by columns."""
    keys = Parquet.keys(self.source, *by)
    if keys and limit is not None:
        table = Parquet.rank(self.source, limit, *keys, dense=dense)
        if keys == by:
            return self.resolve(info, table if dense else table[:limit])
    else:
        table = self.table
    if dense and limit is not None:
        groups = table.aggregate(_=ibis._.count(), by=[name.lstrip('-') for name in by])
        limit = groups.order_by(*map(order_key, by))[:limit]['_'].sum().to_pyarrow().as_py()
    return self.resolve(info, table.order_by(*map(order_key, by))[:limit])

project(info, columns)

Mutate columns by expressions.

Renamed to not be confused with a mutation.

Source code in graphique/interface.py
345
346
347
348
349
350
351
352
@doc_field
def project(self, info: Info, columns: list[Projection]) -> Self:
    """[Mutate](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.mutate) columns by expressions.

    Renamed to not be confused with a mutation.
    """
    projection = dict(map(Projection.to_ibis, columns))
    return self.resolve(info, self.table.mutate(projection))

resolve(info, source)

Cache the table if it will be reused.

Source code in graphique/interface.py
63
64
65
66
67
68
69
70
def resolve(self, info: Info, source: ibis.Table) -> Self:
    """Cache the table if it will be reused."""
    counts = selections(*info.selected_fields)
    counts['type'] = counts['schema'] = 0
    if counts.total() > 1 and isinstance(source, ibis.Table):
        if names := self.select(info, source):
            source = source.select(*names).cache()
    return type(self)(source)

resolve_reference(info, **keys) classmethod

Return table filtered by federated keys.

Source code in graphique/interface.py
72
73
74
75
76
77
78
@classmethod
@no_type_check
def resolve_reference(cls, info: Info, **keys) -> Self:
    """Return table filtered by federated keys."""
    self = getattr(info.root_value, cls.field)
    queries = {name: Filter(eq=[keys[name]]) for name in keys}
    return self.filter(info, **queries)

row(info, index=0)

Scalar values at index.

Source code in graphique/interface.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
def row(self, info: Info, index: int = 0) -> dict:
    """Scalar values at index."""
    names = selections(*info.selected_fields)
    table = self.table.select(*names)[index:][:1].cache()
    row = {}
    for name in table.columns:
        if isinstance(table[name], ibis.expr.types.ArrayColumn):
            row[name] = Column.cast(table[name].first().unnest())
        else:
            (row[name],) = table[name].to_list()
    return row

runs(info, by=[], split=[], counts='', alias='{}_index', aggregate={})

Provisionally group table by adjacent values in columns.

Source code in graphique/interface.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
@doc_field(
    by="column names to compare by equality",
    split="boolean column expressions to split on true values",
    counts="optionally include counts in an aliased column",
    alias="format string to name index columns",
    aggregate="aggregation functions applied to other columns",
)
def runs(
    self,
    info: Info,
    by: list[str] = [],
    split: list[Projection] = [],
    counts: str = '',
    alias: str = '{}_index',
    aggregate: Aggregates = {},  # type: ignore
) -> Self:
    """Provisionally group table by adjacent values in columns."""
    table = self.table
    projection = dict(map(Projection.to_ibis, split))
    for name in by:
        column = table[name] != table[name].lag()
        projection[alias.format(name)] = column.fill_null(False)
    aggs = {name: ibis._[name].first() for name in by}
    aggs.update(aggregate)  # type: ignore
    if counts:
        aggs[counts] = ibis._.count()
    table = table.mutate(projection)  # window functions can't be nested
    table = table.mutate({name: table[name].cumsum() for name in projection})
    return self.resolve(info, table.aggregate(aggs, by=list(projection)).order_by(*projection))

select(info, source) staticmethod

Return minimal schema needed to continue.

Source code in graphique/interface.py
80
81
82
83
84
85
86
87
@staticmethod
def select(info: Info, source: Source) -> list:
    """Return minimal schema needed to continue."""
    refs: set = set()
    for field in info.selected_fields:
        for selection in field.selections:
            refs.update(references(selection))
    return [name for name in ibis_schema(source) if name in refs]

slice(info, offset=0, limit=None)

Limit row selection.

Source code in graphique/interface.py
181
182
183
184
185
186
187
@doc_field(
    offset="number of rows to skip; negative value skips from the end",
    limit="maximum number of rows to return",
)
def slice(self, info: Info, offset: BigInt = 0, limit: BigInt | None = None) -> Self:
    """[Limit](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.limit) row selection."""
    return self.resolve(info, self.table[offset:][:limit])

take(info, indices)

Take rows by index.

Source code in graphique/interface.py
316
317
318
319
320
321
322
323
324
325
@doc_field
def take(self, info: Info, indices: list[BigInt]) -> Self:
    """[Take](https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.take) rows by index."""
    names = self.select(info, self.source)
    if isinstance(self.source, ds.Dataset):
        table = self.source.take(indices, columns=names)
    else:
        batches = self.source.select(*names).to_pyarrow_batches()
        table = ds.Scanner.from_batches(batches).take(indices)
    return type(self)(ibis.memtable(table))

unnest(info, name, offset='', keep_empty=False, row_number='')

Unnest an array column from a table.

Source code in graphique/interface.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
@doc_field(
    name="column name",
    offset="optionally include index column",
    keep_empty="keep empty array values as null",
    row_number="optionally include first row number in an aliased column",
)
def unnest(
    self,
    info: Info,
    name: str,
    offset: str = '',
    keep_empty: bool = False,
    row_number: str = '',
) -> Self:
    """[Unnest](https://ibis-project.org/reference/expression-tables#ibis.expr.types.relations.Table.unnest) an array column from a table."""
    table = self.table
    if row_number:
        table = table.mutate({row_number: ibis.row_number()})
    return self.resolve(info, table.unnest(name, offset=offset or None, keep_empty=keep_empty))

graphique.middleware.GraphQL

Bases: GraphQL

ASGI GraphQL app with root value(s).

Parameters:

Name Type Description Default
root Source

root dataset to attach as the Query type

required
metrics bool

enable timing extension

False
**kwargs

additional asgi.GraphQL options

{}
Source code in graphique/middleware.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class GraphQL(strawberry.asgi.GraphQL):
    """ASGI GraphQL app with root value(s).

    Args:
        root: root dataset to attach as the Query type
        metrics: enable timing extension
        **kwargs: additional `asgi.GraphQL` options
    """

    options = dict(types=Column.registry.values(), scalar_overrides=scalar_map)

    def __init__(self, root: Source, metrics: bool = False, **kwargs):
        options: dict = dict(self.options, extensions=[MetricsExtension] if metrics else [])
        if type(root).__name__ == 'Query':
            self.root_value = root
            options['enable_federation_2'] = True
            schema = strawberry.federation.Schema(type(self.root_value), **options)
        else:
            self.root_value = implemented(root)
            schema = strawberry.Schema(type(self.root_value), **options)
        super().__init__(schema, **kwargs)

    async def get_root_value(self, request):
        return self.root_value

    @classmethod
    def federated(cls, roots: Mapping[str, Source], keys: Mapping[str, Iterable] = {}, **kwargs):
        """Construct GraphQL app with multiple federated datasets.

        Args:
            roots: mapping of field names to root datasets
            keys: mapping of optional federation keys for each root
            **kwargs: additional `asgi.GraphQL` options
        """
        root_values = {name: implemented(roots[name], name, keys.get(name, ())) for name in roots}
        annotations = {name: type(root_values[name]) for name in root_values}
        Query = type('Query', (), {'__annotations__': annotations})
        return cls(strawberry.type(Query)(**root_values), **kwargs)

federated(roots, keys={}, **kwargs) classmethod

Construct GraphQL app with multiple federated datasets.

Parameters:

Name Type Description Default
roots Mapping[str, Source]

mapping of field names to root datasets

required
keys Mapping[str, Iterable]

mapping of optional federation keys for each root

{}
**kwargs

additional asgi.GraphQL options

{}
Source code in graphique/middleware.py
65
66
67
68
69
70
71
72
73
74
75
76
77
@classmethod
def federated(cls, roots: Mapping[str, Source], keys: Mapping[str, Iterable] = {}, **kwargs):
    """Construct GraphQL app with multiple federated datasets.

    Args:
        roots: mapping of field names to root datasets
        keys: mapping of optional federation keys for each root
        **kwargs: additional `asgi.GraphQL` options
    """
    root_values = {name: implemented(roots[name], name, keys.get(name, ())) for name in roots}
    annotations = {name: type(root_values[name]) for name in root_values}
    Query = type('Query', (), {'__annotations__': annotations})
    return cls(strawberry.type(Query)(**root_values), **kwargs)