make_pandas_expr_udf

xorq.expr.udf.make_pandas_expr_udf(
    computed_kwargs_expr,
    fn,
    schema,
    return_type=dt.binary,
    database=None,
    catalog=None,
    name=None,
    *,
    post_process_fn=unwrap_model,
    **kwargs,
)

Create an expression-based scalar UDF that incorporates pre-computed values.

This function creates a special type of scalar UDF that can access pre-computed values (like trained machine learning models) during execution. The pre-computed value is generated from a separate expression and passed to the UDF function, enabling complex workflows like model training and inference within the same query pipeline.

Parameters

Name Type Description Default
computed_kwargs_expr Expression An expression that computes a value to be passed to the UDF function. This is typically an aggregation that produces a model or other computed value. required
fn callable The function to be executed. Should accept (computed_arg, df, **kwargs) where computed_arg is the result of computed_kwargs_expr and df is a pandas DataFrame containing the input columns. required
schema Schema The input schema defining column names and their data types. required
return_type DataType The return data type of the UDF. dt.binary
database str Database name for the UDF namespace. None
catalog str Catalog name for the UDF namespace. None
name str Name of the UDF. If None, uses the function name. None
post_process_fn callable Function to post-process the computed_kwargs_expr result before passing to the main function. unwrap_model
**kwargs Additional configuration parameters. {}

Returns

Name Type Description
callable A UDF constructor that can be used in expressions.

Examples

Machine learning workflow with penguin species classification:

>>> import pickle
>>> import pandas as pd
>>> from sklearn.neighbors import KNeighborsClassifier
>>> from xorq.expr.udf import make_pandas_expr_udf, agg
>>> import xorq.expr.datatypes as dt
>>> import xorq as xo
>>> # Load penguins dataset
>>> penguins = xo.examples.penguins.fetch(backend=xo.connect())
>>> features = ['bill_length_mm', 'bill_depth_mm', 'flipper_length_mm', 'body_mass_g']
>>> # Split data
>>> train_data = penguins.filter(penguins.year < 2009)
>>> test_data = penguins.filter(penguins.year >= 2009)
>>> # Define training function
>>> def train_penguin_model(df):
...     df_clean = df.dropna(subset=features + ['species'])
...     X = df_clean[features]
...     y = df_clean['species']
...
...     model = KNeighborsClassifier(n_neighbors=5)
...     model.fit(X, y)
...     return pickle.dumps(model)
>>> # Define prediction function
>>> def predict_penguin_species(model, df):
...     df_clean = df.dropna(subset=features)
...     X = df_clean[features]
...     predictions = model.predict(X)
...     # Return predictions for all rows (fill NaN for missing data)
...     result = pd.Series(index=df.index, dtype='object')
...     result.loc[df_clean.index] = predictions
...     return result.fillna('Unknown')
>>> # Create schemas for training and prediction
>>> train_schema = train_data.select(features + ['species']).schema()
>>> test_schema = test_data.select(features).schema()
>>> # Create model training UDAF
>>> model_udaf = agg.pandas_df(
...     fn=train_penguin_model,
...     schema=train_schema,
...     return_type=dt.binary,
...     name="train_penguin_model"
>>> )
>>> # Create prediction UDF that uses trained model
>>> predict_udf = make_pandas_expr_udf(
...     computed_kwargs_expr=model_udaf.on_expr(train_data),
...     fn=predict_penguin_species,
...     schema=test_schema,
...     return_type=dt.string,
...     name="predict_species"
>>> )
>>> # Apply predictions to test data
>>> result = test_data.mutate(
...     predicted_species=predict_udf.on_expr(test_data)
>>> ).execute()

Penguin size classification with pre-computed thresholds:

>>> def compute_size_thresholds(df):
...     df_clean = df.dropna(subset=['body_mass_g'])
...     return {
...         'small_threshold': df_clean['body_mass_g'].quantile(0.33),
...         'large_threshold': df_clean['body_mass_g'].quantile(0.67)
...     }
>>> def classify_penguin_size(thresholds, df):
...     def classify_size(mass):
...         if pd.isna(mass):
...             return 'Unknown'
...         elif mass < thresholds['small_threshold']:
...             return 'Small'
...         elif mass > thresholds['large_threshold']:
...             return 'Large'
...         else:
...             return 'Medium'
...
...     return df['body_mass_g'].apply(classify_size)
>>> # Create threshold computation UDAF
>>> threshold_udaf = agg.pandas_df(
...     fn=compute_size_thresholds,
...     schema=penguins.select(['body_mass_g']).schema(),
...     return_type=dt.Struct({
...         'small_threshold': dt.float64,
...         'large_threshold': dt.float64
...     }),
...     name="compute_thresholds"
>>> )
>>> # Create size classification UDF
>>> size_classify_udf = make_pandas_expr_udf(
...     computed_kwargs_expr=threshold_udaf.on_expr(penguins),
...     fn=classify_penguin_size,
...     schema=penguins.select(['body_mass_g']).schema(),
...     return_type=dt.string,
...     name="classify_size",
...     post_process_fn=lambda x: x  # thresholds are already a dict
>>> )
>>> # Apply size classification
>>> result = penguins.mutate(
...     size_category=size_classify_udf.on_expr(penguins)
>>> ).execute()

Notes

This UDF type is particularly powerful for ML workflows where you need to: 1. Train a model on aggregated data 2. Serialize the trained model 3. Use the model for predictions on new data

The computed_kwargs_expr is evaluated once and its result is passed to every invocation of the main function, enabling efficient model reuse.

See Also

make_pandas_udf : For standard pandas-based scalar UDFs agg : For aggregation functions