>>> from xorq.expr.udf import pyarrow_udwf
>>> import pyarrow as pa
>>> import xorq as xo
>>> import xorq.expr.datatypes as dt
>>> from xorq.vendor import ibis
pyarrow_udwf
xorq.expr.udf.pyarrow_udwf(
fn,
schema,
return_type,=None,
name=Namespace(database=None, catalog=None),
namespace=AggUDF,
base**config_kwargs,
)
Create a User-Defined Window Function (UDWF) using PyArrow.
This decorator creates window functions that can process partitions of data with support for ordering, framing, and ranking. UDWFs are powerful for implementing custom analytics functions that need to operate over ordered sets of data within partitions.
Parameters
Name | Type | Description | Default |
---|---|---|---|
fn | callable | The window function implementation. The signature depends on config_kwargs: - Basic window function: fn(self, values: list[pa.Array], num_rows: int) -> pa.Array - With window frame: fn(self, values: list[pa.Array], eval_range: tuple[int, int]) -> pa.Scalar - With ranking: fn(self, num_rows: int, ranks_in_partition: list[tuple[int, int]]) -> pa.Array |
required |
schema | Schema | Input schema defining column names and data types. | required |
return_type | DataType | The return data type of the window function. | required |
name | str | Name of the UDWF. If None, uses the function name. | None |
namespace | Namespace | Database and catalog namespace for the function. | Namespace(database=None, catalog=None) |
base | class | Base class for the UDWF (typically AggUDF). | AggUDF |
**config_kwargs | Configuration options: - uses_window_frame (bool): Whether function uses window framing - supports_bounded_execution (bool): Whether function supports bounded execution - include_rank (bool): Whether function uses ranking information - Custom parameters: Additional parameters accessible via self in the function |
{} |
Returns
Name | Type | Description |
---|---|---|
callable | A UDWF constructor that can be used in window expressions. |
Examples
Exponential smoothing for penguin body mass by species:
>>> # Load penguins dataset
>>> penguins = xo.examples.penguins.fetch(backend=xo.connect())
>>> @pyarrow_udwf(
=ibis.schema({"body_mass_g": dt.float64}),
... schema=dt.float64,
... return_type=0.8 # Custom smoothing parameter
... alpha
... )>>> def smooth_body_mass(self, values: list[pa.Array], num_rows: int) -> pa.Array:
= []
... results = 0.0
... curr_value = values[0] # body_mass_g column
... mass_values
...for idx in range(num_rows):
... if idx == 0:
... = float(mass_values[idx].as_py() or 0)
... curr_value else:
... = float(mass_values[idx].as_py() or curr_value)
... new_val = new_val * self.alpha + curr_value * (1.0 - self.alpha)
... curr_value
... results.append(curr_value)
...return pa.array(results) ...
>>> # Apply smoothing within each species, ordered by year
>>> result = penguins.mutate(
=smooth_body_mass.on_expr(penguins).over(
... smooth_mass="species", order_by="year")
... ibis.window(group_by
... )>>> ).execute()
Running difference in penguin bill measurements:
>>> @pyarrow_udwf(
=ibis.schema({"bill_length_mm": dt.float64}),
... schema=dt.float64,
... return_type=True
... uses_window_frame
... )>>> def bill_length_diff(self, values: list[pa.Array], eval_range: tuple[int, int]) -> pa.Scalar:
= eval_range
... start, stop = values[0]
... bill_values
...if start == stop - 1: # Single row
... return pa.scalar(0.0)
...
...= bill_values[stop - 1].as_py() or 0
... current_val = bill_values[start].as_py() or 0
... previous_val return pa.scalar(float(current_val - previous_val)) ...
>>> # Calculate difference from previous measurement within species
>>> result = penguins.mutate(
=bill_length_diff.on_expr(penguins).over(
... bill_diff
... ibis.window(="species",
... group_by="year",
... order_by=1,
... preceding=0
... following
... )
... )>>> ).execute()
Penguin ranking within species by body mass:
>>> @pyarrow_udwf(
=ibis.schema({"body_mass_g": dt.float64}),
... schema=dt.float64,
... return_type=True
... include_rank
... )>>> def mass_rank_score(self, num_rows: int, ranks_in_partition: list[tuple[int, int]]) -> pa.Array:
= []
... results for idx in range(num_rows):
... # Find rank for current row
... = next(
... rank + 1 for i, (start, end) in enumerate(ranks_in_partition)
... i if start <= idx < end
...
... )# Convert rank to score (higher rank = higher score)
... = 1.0 - (rank - 1) / len(ranks_in_partition)
... score
... results.append(score)return pa.array(results) ...
>>> # Calculate mass rank score within each species
>>> result = penguins.mutate(
=mass_rank_score.on_expr(penguins).over(
... mass_rank_score="species", order_by="body_mass_g")
... ibis.window(group_by
... )>>> ).execute()
Complex penguin feature calculation across measurements:
>>> @pyarrow_udwf(
=ibis.schema({
... schema"bill_length_mm": dt.float64,
... "bill_depth_mm": dt.float64,
... "flipper_length_mm": dt.float64
...
... }),=dt.float64,
... return_type=3 # Custom parameter for moving average
... window_size
... )>>> def penguin_size_trend(self, values: list[pa.Array], num_rows: int) -> pa.Array:
= values[0]
... bill_length = values[1]
... bill_depth = values[2]
... flipper_length
...= []
... results = self.window_size
... window_size
...for idx in range(num_rows):
... # Calculate size metric for current and surrounding rows
... = max(0, idx - window_size // 2)
... start_idx = min(num_rows, idx + window_size // 2 + 1)
... end_idx
...= []
... size_metrics for i in range(start_idx, end_idx):
... if (bill_length[i].is_valid and bill_depth[i].is_valid and
...
... flipper_length[i].is_valid):# Composite size metric
... = bill_length[i].as_py() * bill_depth[i].as_py()
... bill_area = bill_area * flipper_length[i].as_py()
... size_metric
... size_metrics.append(size_metric)
...# Average size metric in window
... = sum(size_metrics) / len(size_metrics) if size_metrics else 0.0
... avg_size
... results.append(avg_size)
...return pa.array(results) ...
>>> # Apply size trend calculation within each species
>>> result = penguins.mutate(
=penguin_size_trend.on_expr(penguins).over(
... size_trend="species", order_by="year")
... ibis.window(group_by
... )>>> ).execute()
Notes
The function signature and behavior changes based on configuration:
- Standard window function: Processes all rows in partition, returns array
- Frame-based: Processes specific row ranges, returns scalar per invocation
- Rank-aware: Has access to ranking information within partition
Custom parameters passed in config_kwargs are accessible as self.parameter_name
in the function implementation.
See Also
scalar : For row-by-row processing agg : For aggregation across groups make_pandas_udf : For pandas-based scalar operations