import pandas as pd
import xorq as xo
from xorq.expr.udf import make_pandas_udf
import xorq.vendor.ibis.expr.datatypes as dt
# Create sample data
= pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
df = xo.connect()
con = con.register(df, "t")
t
# Define UDF that takes minimum value across columns
= lambda df: df.min(axis=1)
my_least = t.select(["a", "b"]).schema()
schema = make_pandas_udf(my_least, schema, dt.int64(), name="my_least")
udf
# Apply UDF
= t.mutate(min_val=udf.on_expr(t)).execute() result
User-Defined Functions
User-defined functions (UDFs) in xorq serve as powerful tools to streamline data pipelines by:
- Reducing pipeline complexity: UDFs allow you to embed sophisticated logic directly in your data processing workflow, eliminating the need for separate processing steps or microservices.
- Maintaining data locality: Process data where it resides without moving it between environments, reducing latency and resource usage.
- Enabling code reuse: Encapsulate complex logic in functions that can be used across multiple pipelines and projects.
- Simplifying ML workflows: Seamlessly integrate model training and inference within your data pipeline, reducing the complexity of MLOps.
Overview
xorq supports three types of user-defined functions (UDFs): - Scalar UDFs: Process data row by row - UDAFs: Aggregate functions that process groups of rows - UDWFs: Window functions that operate over partitions and frames
All UDFs integrate with XORQ’s execution engine for optimal performance.
Scalar UDFs
The simplest type - processes one row at a time.
UDAFs (Aggregation Functions)
Process groups of rows to produce aggregate values.
from xorq.expr.udf import agg
import pyarrow.compute as pc
# Sample UDAF using PyArrow
@agg.pyarrow
def my_mean(arr: dt.float64) -> dt.float64:
return pc.mean(arr)
# Using UDAF in groupby
= xo.examples.batting.fetch(deferred=False, backend=con, table_name="batting")
t = t.group_by("yearID").agg(mean_games=my_mean(t.G)).execute() result
UDWFs (Window Functions)
Process partitions of data with ordering and framing.
from xorq.expr.udf import pyarrow_udwf
from xorq.vendor import ibis
import pyarrow as pa
# Define UDWF using the decorator approach
@pyarrow_udwf(
=ibis.schema({"a": float}),
schema=ibis.dtype(float),
return_type=0.9,
alpha
)def exp_smooth(self, values: list[pa.Array], num_rows: int) -> pa.Array:
= []
results = 0.0
curr_value = values[0]
values for idx in range(num_rows):
if idx == 0:
= values[idx].as_py()
curr_value else:
= values[idx].as_py() * self.alpha + curr_value * (1.0 - self.alpha)
curr_value
results.append(curr_value)return pa.array(results)
# Register data
= xo.connect()
con = con.register(
t
pa.Table.from_batches([
pa.RecordBatch.from_arrays(1.0, 2.0, 3.0, 4.0, 5.0])],
[pa.array([=["a"]
names
)
]),="t"
table_name
)
# Apply UDWF with window specification
= t.mutate(
result =exp_smooth.on_expr(t).over(ibis.window())
smoothed ).execute()
Expr Scalar UDF
Expr Scalar UDFs allow you to incorporate pre-computed values (like trained models) into your UDF execution. This is particularly useful for machine learning workflows. For the next example we are going to train an XGBoost model on data from the Lending Club
import pickle
import toolz
import xgboost as xgb
import xorq as xo
import xorq.expr.datatypes as dt
import xorq.expr.udf as udf
from xorq.common.utils.toolz_utils import curry
from xorq.expr.udf import (
make_pandas_expr_udf,
)
= "rownum"
ROWNUM = (
features "emp_length",
"dti",
"annual_inc",
"loan_amnt",
"fico_range_high",
"cr_age_days",
)= "event_occurred"
target = "model"
model_key = "predicted"
prediction_key = "float32"
prediction_typ
@curry
def train_xgboost_model(df, features=features, target=target, seed=0):
= {"max_depth": 4, "eta": 1, "objective": "binary:logistic", "seed": seed}
param = 10
num_round if ROWNUM in df:
# enforce order for reproducibility
= df.sort_values(ROWNUM, ignore_index=True)
df = df[list(features)]
X = df[target]
y = xgb.DMatrix(X, y)
dtrain = xgb.train(param, dtrain, num_boost_round=num_round)
bst return bst
@curry
def predict_xgboost_model(model, df, features=features):
return model.predict(xgb.DMatrix(df[list(features)]))
= xo.deferred_read_parquet(
t connect(), xo.config.options.pins.get_path("lending-club")
xo.
)
= xo.train_test_splits(
(train, test)
t,=ROWNUM,
unique_key=0.7,
test_sizes=42,
random_seed
)= udf.agg.pandas_df(
model_udaf =toolz.compose(pickle.dumps, train_xgboost_model),
fn=t[features + (target,)].schema(),
schema=dt.binary,
return_type=model_key,
name
)= make_pandas_expr_udf(
predict_expr_udf =model_udaf.on_expr(train),
computed_kwargs_expr=predict_xgboost_model,
fn=t[features].schema(),
schema=dt.dtype(prediction_typ),
return_type=prediction_key,
name
)= test.mutate(predict_expr_udf.on_expr(test).name(prediction_key)) expr
This pattern enables an end-to-end ML workflow where: 1. The model is trained once using aggregated data 2. The trained model is serialized and passed to prediction UDF 3. Predictions are made in the query execution context without manual intervention