>>> import pickle
>>> import pandas as pd
>>> from sklearn.neighbors import KNeighborsClassifier
>>> from xorq.expr.udf import make_pandas_expr_udf, agg
>>> import xorq.expr.datatypes as dt
>>> import xorq.api as xomake_pandas_expr_udf
xorq.expr.udf.make_pandas_expr_udf(
    computed_kwargs_expr,
    fn,
    schema,
    return_type=dt.binary,
    database=None,
    catalog=None,
    name=None,
    *,
    post_process_fn=unwrap_model,
    **kwargs,
)Create an expression-based scalar UDF that incorporates pre-computed values.
This function creates a special type of scalar UDF that can access pre-computed values (like trained machine learning models) during execution. The pre-computed value is generated from a separate expression and passed to the UDF function, enabling complex workflows like model training and inference within the same query pipeline.
Parameters
| Name | Type | Description | Default | 
|---|---|---|---|
| computed_kwargs_expr | Expression | An expression that computes a value to be passed to the UDF function. This is typically an aggregation that produces a model or other computed value. | required | 
| fn | callable | The function to be executed. Should accept (computed_arg, df, **kwargs) where computed_arg is the result of computed_kwargs_expr and df is a pandas DataFrame containing the input columns. | required | 
| schema | Schema | The input schema defining column names and their data types. | required | 
| return_type | DataType | The return data type of the UDF. | dt.binary | 
| database | str | Database name for the UDF namespace. | None | 
| catalog | str | Catalog name for the UDF namespace. | None | 
| name | str | Name of the UDF. If None, uses the function name. | None | 
| post_process_fn | callable | Function to post-process the computed_kwargs_expr result before passing to the main function. | unwrap_model | 
| **kwargs | Additional configuration parameters. | {} | 
Returns
| Name | Type | Description | 
|---|---|---|
| callable | A UDF constructor that can be used in expressions. | 
Examples
Machine learning workflow with penguin species classification:
>>> # Load penguins dataset
>>> penguins = xo.examples.penguins.fetch(backend=xo.connect())
>>> features = ['bill_length_mm', 'bill_depth_mm', 'flipper_length_mm', 'body_mass_g']>>> # Split data
>>> train_data = penguins.filter(penguins.year < 2009)
>>> test_data = penguins.filter(penguins.year >= 2009)>>> # Define training function
>>> def train_penguin_model(df):
...     df_clean = df.dropna(subset=features + ['species'])
...     X = df_clean[features]
...     y = df_clean['species']
...
...     model = KNeighborsClassifier(n_neighbors=5)
...     model.fit(X, y)
...     return pickle.dumps(model)>>> # Define prediction function
>>> def predict_penguin_species(model, df):
...     df_clean = df.dropna(subset=features)
...     X = df_clean[features]
...     predictions = model.predict(X)
...     # Return predictions for all rows (fill NaN for missing data)
...     result = pd.Series(index=df.index, dtype='object')
...     result.loc[df_clean.index] = predictions
...     return result.fillna('Unknown')>>> # Create schemas for training and prediction
>>> train_schema = train_data.select(features + ['species']).schema()
>>> test_schema = test_data.select(features).schema()>>> # Create model training UDAF
>>> model_udaf = agg.pandas_df(
...     fn=train_penguin_model,
...     schema=train_schema,
...     return_type=dt.binary,
...     name="train_penguin_model"
>>> )>>> # Create prediction UDF that uses trained model
>>> predict_udf = make_pandas_expr_udf(
...     computed_kwargs_expr=model_udaf.on_expr(train_data),
...     fn=predict_penguin_species,
...     schema=test_schema,
...     return_type=dt.string,
...     name="predict_species"
>>> )>>> # Apply predictions to test data
>>> result = test_data.mutate(
...     predicted_species=predict_udf.on_expr(test_data)
>>> ).execute()Penguin size classification with pre-computed thresholds:
>>> def compute_size_thresholds(df):
...     df_clean = df.dropna(subset=['body_mass_g'])
...     return {
...         'small_threshold': df_clean['body_mass_g'].quantile(0.33),
...         'large_threshold': df_clean['body_mass_g'].quantile(0.67)
...     }>>> def classify_penguin_size(thresholds, df):
...     def classify_size(mass):
...         if pd.isna(mass):
...             return 'Unknown'
...         elif mass < thresholds['small_threshold']:
...             return 'Small'
...         elif mass > thresholds['large_threshold']:
...             return 'Large'
...         else:
...             return 'Medium'
...
...     return df['body_mass_g'].apply(classify_size)>>> # Create threshold computation UDAF
>>> threshold_udaf = agg.pandas_df(
...     fn=compute_size_thresholds,
...     schema=penguins.select(['body_mass_g']).schema(),
...     return_type=dt.Struct({
...         'small_threshold': dt.float64,
...         'large_threshold': dt.float64
...     }),
...     name="compute_thresholds"
>>> )>>> # Create size classification UDF
>>> size_classify_udf = make_pandas_expr_udf(
...     computed_kwargs_expr=threshold_udaf.on_expr(penguins),
...     fn=classify_penguin_size,
...     schema=penguins.select(['body_mass_g']).schema(),
...     return_type=dt.string,
...     name="classify_size",
...     post_process_fn=lambda x: x  # thresholds are already a dict
>>> )>>> # Apply size classification
>>> result = penguins.mutate(
...     size_category=size_classify_udf.on_expr(penguins)
>>> ).execute()Notes
This UDF type is particularly powerful for ML workflows where you need to: 1. Train a model on aggregated data 2. Serialize the trained model 3. Use the model for predictions on new data
The computed_kwargs_expr is evaluated once and its result is passed to every invocation of the main function, enabling efficient model reuse.
See Also
make_pandas_udf : For standard pandas-based scalar UDFs agg : For aggregation functions