agg.pandas_df

pandas_df(
    fn,
    schema,
    return_type,
    database=None,
    catalog=None,
    name=None,
    **kwargs,
)

Create a pandas DataFrame-based aggregation function.

This method creates aggregation UDFs that operate on pandas DataFrames, providing access to the full pandas ecosystem for complex aggregations. It’s particularly useful for statistical operations, machine learning model training, and complex data transformations that are easier to express with pandas.

Parameters

Name Type Description Default
fn callable The aggregation function. Should accept a pandas DataFrame and return a value compatible with the return_type. The DataFrame contains all columns specified in the schema for each group. required
schema Schema or dict Input schema defining column names and their data types. required
return_type DataType The return data type of the aggregation. required
database str Database name for the UDF namespace. None
catalog str Catalog name for the UDF namespace. None
name str Name of the UDF. If None, generates a name from the function. None
**kwargs Additional configuration parameters (e.g., volatility settings). {}

Returns

Name Type Description
callable A UDF constructor that can be used in aggregation expressions.

Examples

Training a KNN classifier on penguin data as an aggregation:

>>> import pickle
>>> from sklearn.neighbors import KNeighborsClassifier
>>> from xorq.expr.udf import agg
>>> import xorq.expr.datatypes as dt
>>> import xorq as xo
>>> # Load penguins dataset
>>> penguins = xo.examples.penguins.fetch(backend=xo.connect())
>>> features = ['bill_length_mm', 'bill_depth_mm', 'flipper_length_mm', 'body_mass_g']
>>> def train_penguin_classifier(df):
...     # Remove rows with missing values
...     df_clean = df.dropna(subset=features + ['species'])
...     X = df_clean[features]
...     y = df_clean['species']
...
...     model = KNeighborsClassifier(n_neighbors=3)
...     model.fit(X, y)
...     return pickle.dumps(model)
>>> # Create the aggregation UDF
>>> penguin_schema = penguins.select(features + ['species']).schema()
>>> train_model_udf = agg.pandas_df(
...     fn=train_penguin_classifier,
...     schema=penguin_schema,
...     return_type=dt.binary,
...     name="train_penguin_classifier"
>>> )
>>> # Train one model per island
>>> trained_models = penguins.group_by("island").agg(
...     model=train_model_udf.on_expr(penguins)
>>> ).execute()

Complex statistical aggregation for penguin measurements:

>>> def penguin_stats(df):
...     return {
...         'bill_ratio_mean': (df['bill_length_mm'] / df['bill_depth_mm']).mean(),
...         'mass_flipper_corr': df['body_mass_g'].corr(df['flipper_length_mm']),
...         'count': len(df),
...         'size_score': (df['body_mass_g'] * df['flipper_length_mm']).mean()
...     }
>>> stats_schema = penguins.select([
...     'bill_length_mm', 'bill_depth_mm', 'body_mass_g', 'flipper_length_mm'
>>> ]).schema()
>>> stats_udf = agg.pandas_df(
...     fn=penguin_stats,
...     schema=stats_schema,
...     return_type=dt.Struct({
...         'bill_ratio_mean': dt.float64,
...         'mass_flipper_corr': dt.float64,
...         'count': dt.int64,
...         'size_score': dt.float64
...     }),
...     name="penguin_stats"
>>> )
>>> # Calculate statistics by species
>>> result = penguins.group_by("species").agg(
...     stats=stats_udf.on_expr(penguins)
>>> ).execute()

Feature selection for penguin classification:

>>> def select_best_penguin_features(df, n_features=2):
...     from sklearn.feature_selection import mutual_info_classif
...     import pandas as pd
...
...     df_clean = df.dropna()
...     features = ['bill_length_mm', 'bill_depth_mm', 'flipper_length_mm', 'body_mass_g']
...     X = df_clean[features]
...     y = df_clean['species']
...
...     scores = mutual_info_classif(X, y)
...     return list(pd.Series(scores, index=features).nlargest(n_features).index)
>>> feature_selector = agg.pandas_df(
...     fn=select_best_penguin_features,
...     schema=penguins.schema(),
...     return_type=dt.Array(dt.string),
...     name="select_penguin_features"
>>> )
>>> # Find best features by island
>>> best_features = penguins.group_by("island").agg(
...     top_features=feature_selector.on_expr(penguins)
>>> ).execute()

Notes

  • The function receives a pandas DataFrame containing all rows in each group
  • PyArrow arrays are automatically converted to pandas for processing
  • The function can return complex data structures (dicts, lists) if return_type supports it
  • Use this when you need pandas-specific functionality or ML libraries
  • Performance may be lower than PyArrow UDAFs for simple numerical operations
  • Particularly powerful for ML model training workflows

See Also

agg.pyarrow : For high-performance PyArrow-based aggregations make_pandas_expr_udf : For using trained models in prediction UDFs make_pandas_udf : For scalar pandas operations