>>> import pickle
>>> import pandas as pd
>>> from sklearn.neighbors import KNeighborsClassifier
>>> from xorq.expr.udf import make_pandas_expr_udf, agg
>>> import xorq.expr.datatypes as dt
>>> import xorq as xo
make_pandas_expr_udf
xorq.expr.udf.make_pandas_expr_udf(
computed_kwargs_expr,
fn,
schema,=dt.binary,
return_type=None,
database=None,
catalog=None,
name*,
=unwrap_model,
post_process_fn**kwargs,
)
Create an expression-based scalar UDF that incorporates pre-computed values.
This function creates a special type of scalar UDF that can access pre-computed values (like trained machine learning models) during execution. The pre-computed value is generated from a separate expression and passed to the UDF function, enabling complex workflows like model training and inference within the same query pipeline.
Parameters
Name | Type | Description | Default |
---|---|---|---|
computed_kwargs_expr | Expression | An expression that computes a value to be passed to the UDF function. This is typically an aggregation that produces a model or other computed value. | required |
fn | callable | The function to be executed. Should accept (computed_arg, df, **kwargs) where computed_arg is the result of computed_kwargs_expr and df is a pandas DataFrame containing the input columns. | required |
schema | Schema | The input schema defining column names and their data types. | required |
return_type | DataType | The return data type of the UDF. | dt.binary |
database | str | Database name for the UDF namespace. | None |
catalog | str | Catalog name for the UDF namespace. | None |
name | str | Name of the UDF. If None, uses the function name. | None |
post_process_fn | callable | Function to post-process the computed_kwargs_expr result before passing to the main function. | unwrap_model |
**kwargs | Additional configuration parameters. | {} |
Returns
Name | Type | Description |
---|---|---|
callable | A UDF constructor that can be used in expressions. |
Examples
Machine learning workflow with penguin species classification:
>>> # Load penguins dataset
>>> penguins = xo.examples.penguins.fetch(backend=xo.connect())
>>> features = ['bill_length_mm', 'bill_depth_mm', 'flipper_length_mm', 'body_mass_g']
>>> # Split data
>>> train_data = penguins.filter(penguins.year < 2009)
>>> test_data = penguins.filter(penguins.year >= 2009)
>>> # Define training function
>>> def train_penguin_model(df):
= df.dropna(subset=features + ['species'])
... df_clean = df_clean[features]
... X = df_clean['species']
... y
...= KNeighborsClassifier(n_neighbors=5)
... model
... model.fit(X, y)return pickle.dumps(model) ...
>>> # Define prediction function
>>> def predict_penguin_species(model, df):
= df.dropna(subset=features)
... df_clean = df_clean[features]
... X = model.predict(X)
... predictions # Return predictions for all rows (fill NaN for missing data)
... = pd.Series(index=df.index, dtype='object')
... result = predictions
... result.loc[df_clean.index] return result.fillna('Unknown') ...
>>> # Create schemas for training and prediction
>>> train_schema = train_data.select(features + ['species']).schema()
>>> test_schema = test_data.select(features).schema()
>>> # Create model training UDAF
>>> model_udaf = agg.pandas_df(
=train_penguin_model,
... fn=train_schema,
... schema=dt.binary,
... return_type="train_penguin_model"
... name>>> )
>>> # Create prediction UDF that uses trained model
>>> predict_udf = make_pandas_expr_udf(
=model_udaf.on_expr(train_data),
... computed_kwargs_expr=predict_penguin_species,
... fn=test_schema,
... schema=dt.string,
... return_type="predict_species"
... name>>> )
>>> # Apply predictions to test data
>>> result = test_data.mutate(
=predict_udf.on_expr(test_data)
... predicted_species>>> ).execute()
Penguin size classification with pre-computed thresholds:
>>> def compute_size_thresholds(df):
= df.dropna(subset=['body_mass_g'])
... df_clean return {
... 'small_threshold': df_clean['body_mass_g'].quantile(0.33),
... 'large_threshold': df_clean['body_mass_g'].quantile(0.67)
... ... }
>>> def classify_penguin_size(thresholds, df):
def classify_size(mass):
... if pd.isna(mass):
... return 'Unknown'
... elif mass < thresholds['small_threshold']:
... return 'Small'
... elif mass > thresholds['large_threshold']:
... return 'Large'
... else:
... return 'Medium'
...
...return df['body_mass_g'].apply(classify_size) ...
>>> # Create threshold computation UDAF
>>> threshold_udaf = agg.pandas_df(
=compute_size_thresholds,
... fn=penguins.select(['body_mass_g']).schema(),
... schema=dt.Struct({
... return_type'small_threshold': dt.float64,
... 'large_threshold': dt.float64
...
... }),="compute_thresholds"
... name>>> )
>>> # Create size classification UDF
>>> size_classify_udf = make_pandas_expr_udf(
=threshold_udaf.on_expr(penguins),
... computed_kwargs_expr=classify_penguin_size,
... fn=penguins.select(['body_mass_g']).schema(),
... schema=dt.string,
... return_type="classify_size",
... name=lambda x: x # thresholds are already a dict
... post_process_fn>>> )
>>> # Apply size classification
>>> result = penguins.mutate(
=size_classify_udf.on_expr(penguins)
... size_category>>> ).execute()
Notes
This UDF type is particularly powerful for ML workflows where you need to: 1. Train a model on aggregated data 2. Serialize the trained model 3. Use the model for predictions on new data
The computed_kwargs_expr is evaluated once and its result is passed to every invocation of the main function, enabling efficient model reuse.
See Also
make_pandas_udf : For standard pandas-based scalar UDFs agg : For aggregation functions