Overview

xorq supports three types of user-defined functions (UDFs):

  • Scalar UDFs: Process data row by row
  • UDAFs: Aggregate functions that process groups of rows
  • UDWFs: Window functions that operate over partitions and frames

All UDFs integrate with XORQ’s execution engine for optimal performance.

Scalar UDFs

The simplest type - processes one row at a time.

import xorq as xo
from xorq.expr.udf import make_pandas_udf
import xorq.vendor.ibis.expr.datatypes as dt

# Create sample data
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
con = xo.connect()
t = con.register(df, "t")

# Define UDF that takes minimum value across columns
my_least = lambda df: df.min(axis=1)
schema = t.select(["a", "b"]).schema()
udf = make_pandas_udf(my_least, schema, dt.int64(), name="my_least")

# Apply UDF
result = t.mutate(min_val=udf.on_expr(t)).execute()

UDAFs (Aggregation Functions)

Process groups of rows to produce aggregate values.

from xorq.expr.udf import agg
import pyarrow.compute as pc

# Sample UDAF using PyArrow
@agg.pyarrow
def my_mean(arr: dt.float64) -> dt.float64:
    return pc.mean(arr)

# Using UDAF in groupby
t = con.table("batting")
result = t.group_by("yearID").agg(mean_games=my_mean(t.G)).execute()

UDWFs (Window Functions)

Process partitions of data with ordering and framing.

from xorq.expr.udf import pyarrow_udwf
from xorq.vendor import ibis
import pyarrow as pa

# Define UDWF using the decorator approach
@pyarrow_udwf(
    schema=ibis.schema({"a": float}),
    return_type=ibis.dtype(float),
    alpha=0.9,
)
def exp_smooth(self, values: list[pa.Array], num_rows: int) -> pa.Array:
    results = []
    curr_value = 0.0
    values = values[0]
    for idx in range(num_rows):
        if idx == 0:
            curr_value = values[idx].as_py()
        else:
            curr_value = values[idx].as_py() * self.alpha + curr_value * (1.0 - self.alpha)
        results.append(curr_value)
    return pa.array(results)

# Register data
con = xo.connect()
t = con.register(
    pa.Table.from_batches([
        pa.RecordBatch.from_arrays(
            [pa.array([1.0, 2.0, 3.0, 4.0, 5.0])],
            names=["a"]
        )
    ]),
    table_name="t"
)

# Apply UDWF with window specification
result = t.mutate(
    smoothed=exp_smooth.on_expr(t).over(ibis.window())
).execute()

Expr Scalar UDF

Expr Scalar UDFs allow you to incorporate pre-computed values (like trained models) into your UDF execution. This is particularly useful for machine learning workflows. For the next example we are going to train an XGBoost model on data from the Lending Club

import toolz
import xgboost as xgb
import pickle

import xorq as xo
import xorq.expr.udf as udf
import xorq.vendor.ibis.expr.datatypes as dt
from xorq.expr.udf import make_pandas_expr_udf, wrap_model

# Define constants for our example
ROWNUM = "rownum"

# The lending club dataset contains loan data with features like:
# - emp_length: employment length in years
# - dti: debt-to-income ratio
# - annual_inc: annual income
# - loan_amnt: loan amount
# - fico_range_high: FICO credit score
# - cr_age_days: credit history age in days
# - event_occurred: binary target variable indicating loan default
features = (
    "emp_length",
    "dti",
    "annual_inc",
    "loan_amnt",
    "fico_range_high",
    "cr_age_days",
)
target = "event_occurred"
model_key = "model"
prediction_key = "predicted"


# Training function that returns a pickled model
@toolz.curry
def train_xgboost_model(df, features, target, seed=0):
    param = {"max_depth": 4, "eta": 1, "objective": "binary:logistic", "seed": seed}
    num_round = 10

    # Sort by rownum if present (for reproducibility)
    if ROWNUM in df:
        df = df.sort_values(ROWNUM, ignore_index=True)

    # Train the model
    X = df[list(features)]
    y = df[target]
    dtrain = xgb.DMatrix(X, y)
    bst = xgb.train(param, dtrain, num_boost_round=num_round)

    # Return pickled model
    return pickle.dumps({model_key: bst})


# Prediction function that uses the model
@toolz.curry
def predict_xgboost_model(df, model, features):
    # Unwrap model from kwargs
    return model.predict(xgb.DMatrix(df[list(features)]))


# Set up the connection and data
con = xo.connect()
t = con.read_parquet(xo.config.options.pins.get_path("lending-club"))
train, test = xo.train_test_splits(t, unique_key=ROWNUM, test_sizes=0.7)

# Create training UDAF that returns a serialized model
model_udaf = udf.agg.pandas_df(
    fn=train_xgboost_model(features=features, target=target),
    schema=t[features + (target,)].schema(),
    return_type=dt.binary,  # Binary type for serialized model
    name=model_key,
)

# Create prediction UDF that uses the model
predict_expr_udf = make_pandas_expr_udf(
    computed_kwargs_expr=model_udaf.on_expr(train),  # Pass training data here
    fn=predict_xgboost_model(features=features),  # Function that makes predictions
    schema=t[features].schema(),  # Schema of input features
    return_type=dt.dtype("float32"),  # Return type of predictions
    name=prediction_key,
    post_process_fn=pickle.loads,  # Function to deserialize model
)

# Apply the UDF to make predictions
expr = test.mutate(prediction=predict_expr_udf.on_expr(test).name(prediction_key))

# Execute and get results
expr.execute()

This pattern enables an end-to-end ML workflow where:

  1. The model is trained once using aggregated data
  2. The trained model is serialized and passed to prediction UDF
  3. Predictions are made in the query execution context without manual intervention