User-Defined Functions

User-defined functions (UDFs) in xorq serve as powerful tools to streamline data pipelines by:

Overview

xorq supports three types of user-defined functions (UDFs): - Scalar UDFs: Process data row by row - UDAFs: Aggregate functions that process groups of rows - UDWFs: Window functions that operate over partitions and frames

All UDFs integrate with XORQ’s execution engine for optimal performance.

Scalar UDFs

The simplest type - processes one row at a time.

import pandas as pd

import xorq as xo
from xorq.expr.udf import make_pandas_udf
import xorq.vendor.ibis.expr.datatypes as dt

# Create sample data
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
con = xo.connect()
t = con.register(df, "t")

# Define UDF that takes minimum value across columns
my_least = lambda df: df.min(axis=1)
schema = t.select(["a", "b"]).schema()
udf = make_pandas_udf(my_least, schema, dt.int64(), name="my_least")

# Apply UDF
result = t.mutate(min_val=udf.on_expr(t)).execute()

UDAFs (Aggregation Functions)

Process groups of rows to produce aggregate values.

from xorq.expr.udf import agg
import pyarrow.compute as pc

# Sample UDAF using PyArrow
@agg.pyarrow
def my_mean(arr: dt.float64) -> dt.float64:
    return pc.mean(arr)

# Using UDAF in groupby
t = xo.examples.batting.fetch(deferred=False, backend=con, table_name="batting")
result = t.group_by("yearID").agg(mean_games=my_mean(t.G)).execute()

UDWFs (Window Functions)

Process partitions of data with ordering and framing.

from xorq.expr.udf import pyarrow_udwf
from xorq.vendor import ibis
import pyarrow as pa

# Define UDWF using the decorator approach
@pyarrow_udwf(
    schema=ibis.schema({"a": float}),
    return_type=ibis.dtype(float),
    alpha=0.9,
)
def exp_smooth(self, values: list[pa.Array], num_rows: int) -> pa.Array:
    results = []
    curr_value = 0.0
    values = values[0]
    for idx in range(num_rows):
        if idx == 0:
            curr_value = values[idx].as_py()
        else:
            curr_value = values[idx].as_py() * self.alpha + curr_value * (1.0 - self.alpha)
        results.append(curr_value)
    return pa.array(results)

# Register data
con = xo.connect()
t = con.register(
    pa.Table.from_batches([
        pa.RecordBatch.from_arrays(
            [pa.array([1.0, 2.0, 3.0, 4.0, 5.0])],
            names=["a"]
        )
    ]),
    table_name="t"
)

# Apply UDWF with window specification
result = t.mutate(
    smoothed=exp_smooth.on_expr(t).over(ibis.window())
).execute()

Expr Scalar UDF

Expr Scalar UDFs allow you to incorporate pre-computed values (like trained models) into your UDF execution. This is particularly useful for machine learning workflows. For the next example we are going to train an XGBoost model on data from the Lending Club

import pickle

import toolz
import xgboost as xgb

import xorq as xo
import xorq.expr.datatypes as dt
import xorq.expr.udf as udf
from xorq.common.utils.toolz_utils import curry
from xorq.expr.udf import (
    make_pandas_expr_udf,
)

ROWNUM = "rownum"
features = (
    "emp_length",
    "dti",
    "annual_inc",
    "loan_amnt",
    "fico_range_high",
    "cr_age_days",
)
target = "event_occurred"
model_key = "model"
prediction_key = "predicted"
prediction_typ = "float32"


@curry
def train_xgboost_model(df, features=features, target=target, seed=0):
    param = {"max_depth": 4, "eta": 1, "objective": "binary:logistic", "seed": seed}
    num_round = 10
    if ROWNUM in df:
        # enforce order for reproducibility
        df = df.sort_values(ROWNUM, ignore_index=True)
    X = df[list(features)]
    y = df[target]
    dtrain = xgb.DMatrix(X, y)
    bst = xgb.train(param, dtrain, num_boost_round=num_round)
    return bst


@curry
def predict_xgboost_model(model, df, features=features):
    return model.predict(xgb.DMatrix(df[list(features)]))


t = xo.deferred_read_parquet(
    xo.connect(), xo.config.options.pins.get_path("lending-club")
)

(train, test) = xo.train_test_splits(
    t,
    unique_key=ROWNUM,
    test_sizes=0.7,
    random_seed=42,
)
model_udaf = udf.agg.pandas_df(
    fn=toolz.compose(pickle.dumps, train_xgboost_model),
    schema=t[features + (target,)].schema(),
    return_type=dt.binary,
    name=model_key,
)
predict_expr_udf = make_pandas_expr_udf(
    computed_kwargs_expr=model_udaf.on_expr(train),
    fn=predict_xgboost_model,
    schema=t[features].schema(),
    return_type=dt.dtype(prediction_typ),
    name=prediction_key,
)
expr = test.mutate(predict_expr_udf.on_expr(test).name(prediction_key))

This pattern enables an end-to-end ML workflow where: 1. The model is trained once using aggregated data 2. The trained model is serialized and passed to prediction UDF 3. Predictions are made in the query execution context without manual intervention