pyarrow_udwf

xorq.expr.udf.pyarrow_udwf(
    fn,
    schema,
    return_type,
    name=None,
    namespace=Namespace(database=None, catalog=None),
    base=AggUDF,
    **config_kwargs,
)

Create a User-Defined Window Function (UDWF) using PyArrow.

This decorator creates window functions that can process partitions of data with support for ordering, framing, and ranking. UDWFs are powerful for implementing custom analytics functions that need to operate over ordered sets of data within partitions.

Parameters

Name Type Description Default
fn callable The window function implementation. The signature depends on config_kwargs: - Basic window function: fn(self, values: list[pa.Array], num_rows: int) -> pa.Array - With window frame: fn(self, values: list[pa.Array], eval_range: tuple[int, int]) -> pa.Scalar - With ranking: fn(self, num_rows: int, ranks_in_partition: list[tuple[int, int]]) -> pa.Array required
schema Schema Input schema defining column names and data types. required
return_type DataType The return data type of the window function. required
name str Name of the UDWF. If None, uses the function name. None
namespace Namespace Database and catalog namespace for the function. Namespace(database=None, catalog=None)
base class Base class for the UDWF (typically AggUDF). AggUDF
**config_kwargs Configuration options: - uses_window_frame (bool): Whether function uses window framing - supports_bounded_execution (bool): Whether function supports bounded execution - include_rank (bool): Whether function uses ranking information - Custom parameters: Additional parameters accessible via self in the function {}

Returns

Name Type Description
callable A UDWF constructor that can be used in window expressions.

Examples

Exponential smoothing for penguin body mass by species:

>>> from xorq.expr.udf import pyarrow_udwf
>>> import pyarrow as pa
>>> import xorq as xo
>>> import xorq.expr.datatypes as dt
>>> from xorq.vendor import ibis
>>> # Load penguins dataset
>>> penguins = xo.examples.penguins.fetch(backend=xo.connect())
>>> @pyarrow_udwf(
...     schema=ibis.schema({"body_mass_g": dt.float64}),
...     return_type=dt.float64,
...     alpha=0.8  # Custom smoothing parameter
... )
>>> def smooth_body_mass(self, values: list[pa.Array], num_rows: int) -> pa.Array:
...     results = []
...     curr_value = 0.0
...     mass_values = values[0]  # body_mass_g column
...
...     for idx in range(num_rows):
...         if idx == 0:
...             curr_value = float(mass_values[idx].as_py() or 0)
...         else:
...             new_val = float(mass_values[idx].as_py() or curr_value)
...             curr_value = new_val * self.alpha + curr_value * (1.0 - self.alpha)
...         results.append(curr_value)
...
...     return pa.array(results)
>>> # Apply smoothing within each species, ordered by year
>>> result = penguins.mutate(
...     smooth_mass=smooth_body_mass.on_expr(penguins).over(
...         ibis.window(group_by="species", order_by="year")
...     )
>>> ).execute()

Running difference in penguin bill measurements:

>>> @pyarrow_udwf(
...     schema=ibis.schema({"bill_length_mm": dt.float64}),
...     return_type=dt.float64,
...     uses_window_frame=True
... )
>>> def bill_length_diff(self, values: list[pa.Array], eval_range: tuple[int, int]) -> pa.Scalar:
...     start, stop = eval_range
...     bill_values = values[0]
...
...     if start == stop - 1:  # Single row
...         return pa.scalar(0.0)
...
...     current_val = bill_values[stop - 1].as_py() or 0
...     previous_val = bill_values[start].as_py() or 0
...     return pa.scalar(float(current_val - previous_val))
>>> # Calculate difference from previous measurement within species
>>> result = penguins.mutate(
...     bill_diff=bill_length_diff.on_expr(penguins).over(
...         ibis.window(
...             group_by="species",
...             order_by="year",
...             preceding=1,
...             following=0
...         )
...     )
>>> ).execute()

Penguin ranking within species by body mass:

>>> @pyarrow_udwf(
...     schema=ibis.schema({"body_mass_g": dt.float64}),
...     return_type=dt.float64,
...     include_rank=True
... )
>>> def mass_rank_score(self, num_rows: int, ranks_in_partition: list[tuple[int, int]]) -> pa.Array:
...     results = []
...     for idx in range(num_rows):
...         # Find rank for current row
...         rank = next(
...             i + 1 for i, (start, end) in enumerate(ranks_in_partition)
...             if start <= idx < end
...         )
...         # Convert rank to score (higher rank = higher score)
...         score = 1.0 - (rank - 1) / len(ranks_in_partition)
...         results.append(score)
...     return pa.array(results)
>>> # Calculate mass rank score within each species
>>> result = penguins.mutate(
...     mass_rank_score=mass_rank_score.on_expr(penguins).over(
...         ibis.window(group_by="species", order_by="body_mass_g")
...     )
>>> ).execute()

Complex penguin feature calculation across measurements:

>>> @pyarrow_udwf(
...     schema=ibis.schema({
...         "bill_length_mm": dt.float64,
...         "bill_depth_mm": dt.float64,
...         "flipper_length_mm": dt.float64
...     }),
...     return_type=dt.float64,
...     window_size=3  # Custom parameter for moving average
... )
>>> def penguin_size_trend(self, values: list[pa.Array], num_rows: int) -> pa.Array:
...     bill_length = values[0]
...     bill_depth = values[1]
...     flipper_length = values[2]
...
...     results = []
...     window_size = self.window_size
...
...     for idx in range(num_rows):
...         # Calculate size metric for current and surrounding rows
...         start_idx = max(0, idx - window_size // 2)
...         end_idx = min(num_rows, idx + window_size // 2 + 1)
...
...         size_metrics = []
...         for i in range(start_idx, end_idx):
...             if (bill_length[i].is_valid and bill_depth[i].is_valid and
...                 flipper_length[i].is_valid):
...                 # Composite size metric
...                 bill_area = bill_length[i].as_py() * bill_depth[i].as_py()
...                 size_metric = bill_area * flipper_length[i].as_py()
...                 size_metrics.append(size_metric)
...
...         # Average size metric in window
...         avg_size = sum(size_metrics) / len(size_metrics) if size_metrics else 0.0
...         results.append(avg_size)
...
...     return pa.array(results)
>>> # Apply size trend calculation within each species
>>> result = penguins.mutate(
...     size_trend=penguin_size_trend.on_expr(penguins).over(
...         ibis.window(group_by="species", order_by="year")
...     )
>>> ).execute()

Notes

The function signature and behavior changes based on configuration:

  • Standard window function: Processes all rows in partition, returns array
  • Frame-based: Processes specific row ranges, returns scalar per invocation
  • Rank-aware: Has access to ranking information within partition

Custom parameters passed in config_kwargs are accessible as self.parameter_name in the function implementation.

See Also

scalar : For row-by-row processing agg : For aggregation across groups make_pandas_udf : For pandas-based scalar operations