>>> import pickle
>>> from sklearn.neighbors import KNeighborsClassifier
>>> from xorq.expr.udf import agg
>>> import xorq.expr.datatypes as dt
>>> import xorq as xo
agg.pandas_df
pandas_df(
fn,
schema,
return_type,=None,
database=None,
catalog=None,
name**kwargs,
)
Create a pandas DataFrame-based aggregation function.
This method creates aggregation UDFs that operate on pandas DataFrames, providing access to the full pandas ecosystem for complex aggregations. It’s particularly useful for statistical operations, machine learning model training, and complex data transformations that are easier to express with pandas.
Parameters
Name | Type | Description | Default |
---|---|---|---|
fn | callable | The aggregation function. Should accept a pandas DataFrame and return a value compatible with the return_type. The DataFrame contains all columns specified in the schema for each group. | required |
schema | Schema or dict | Input schema defining column names and their data types. | required |
return_type | DataType | The return data type of the aggregation. | required |
database | str | Database name for the UDF namespace. | None |
catalog | str | Catalog name for the UDF namespace. | None |
name | str | Name of the UDF. If None, generates a name from the function. | None |
**kwargs | Additional configuration parameters (e.g., volatility settings). | {} |
Returns
Name | Type | Description |
---|---|---|
callable | A UDF constructor that can be used in aggregation expressions. |
Examples
Training a KNN classifier on penguin data as an aggregation:
>>> # Load penguins dataset
>>> penguins = xo.examples.penguins.fetch(backend=xo.connect())
>>> features = ['bill_length_mm', 'bill_depth_mm', 'flipper_length_mm', 'body_mass_g']
>>> def train_penguin_classifier(df):
# Remove rows with missing values
... = df.dropna(subset=features + ['species'])
... df_clean = df_clean[features]
... X = df_clean['species']
... y
...= KNeighborsClassifier(n_neighbors=3)
... model
... model.fit(X, y)return pickle.dumps(model) ...
>>> # Create the aggregation UDF
>>> penguin_schema = penguins.select(features + ['species']).schema()
>>> train_model_udf = agg.pandas_df(
=train_penguin_classifier,
... fn=penguin_schema,
... schema=dt.binary,
... return_type="train_penguin_classifier"
... name>>> )
>>> # Train one model per island
>>> trained_models = penguins.group_by("island").agg(
=train_model_udf.on_expr(penguins)
... model>>> ).execute()
Complex statistical aggregation for penguin measurements:
>>> def penguin_stats(df):
return {
... 'bill_ratio_mean': (df['bill_length_mm'] / df['bill_depth_mm']).mean(),
... 'mass_flipper_corr': df['body_mass_g'].corr(df['flipper_length_mm']),
... 'count': len(df),
... 'size_score': (df['body_mass_g'] * df['flipper_length_mm']).mean()
... ... }
>>> stats_schema = penguins.select([
'bill_length_mm', 'bill_depth_mm', 'body_mass_g', 'flipper_length_mm'
... >>> ]).schema()
>>> stats_udf = agg.pandas_df(
=penguin_stats,
... fn=stats_schema,
... schema=dt.Struct({
... return_type'bill_ratio_mean': dt.float64,
... 'mass_flipper_corr': dt.float64,
... 'count': dt.int64,
... 'size_score': dt.float64
...
... }),="penguin_stats"
... name>>> )
>>> # Calculate statistics by species
>>> result = penguins.group_by("species").agg(
=stats_udf.on_expr(penguins)
... stats>>> ).execute()
Feature selection for penguin classification:
>>> def select_best_penguin_features(df, n_features=2):
from sklearn.feature_selection import mutual_info_classif
... import pandas as pd
...
...= df.dropna()
... df_clean = ['bill_length_mm', 'bill_depth_mm', 'flipper_length_mm', 'body_mass_g']
... features = df_clean[features]
... X = df_clean['species']
... y
...= mutual_info_classif(X, y)
... scores return list(pd.Series(scores, index=features).nlargest(n_features).index) ...
>>> feature_selector = agg.pandas_df(
=select_best_penguin_features,
... fn=penguins.schema(),
... schema=dt.Array(dt.string),
... return_type="select_penguin_features"
... name>>> )
>>> # Find best features by island
>>> best_features = penguins.group_by("island").agg(
=feature_selector.on_expr(penguins)
... top_features>>> ).execute()
Notes
- The function receives a pandas DataFrame containing all rows in each group
- PyArrow arrays are automatically converted to pandas for processing
- The function can return complex data structures (dicts, lists) if return_type supports it
- Use this when you need pandas-specific functionality or ML libraries
- Performance may be lower than PyArrow UDAFs for simple numerical operations
- Particularly powerful for ML model training workflows
See Also
agg.pyarrow : For high-performance PyArrow-based aggregations make_pandas_expr_udf : For using trained models in prediction UDFs make_pandas_udf : For scalar pandas operations