>>> import pandas as pd
>>> import xorq as xo
>>> from xorq.common.utils.toolz_utils import curry
flight_udxf
flight_udxf(
expr,
process_df,
maybe_schema_in,
maybe_schema_out,=None,
name=None,
make_server=None,
make_connection=None,
con=None,
inner_name=(),
make_udxf_kwargs**kwargs,
)
Create a User-Defined Exchange Function (UDXF) that executes a pandas DataFrame transformation via Apache Arrow Flight protocol.
This function wraps a pandas-based data processing function in an Arrow Flight ephemeral server, enabling distributed execution of custom user-defined functions. The function creates a FlightUDXF operation that can be integrated into xorq expression pipelines for scalable data processing.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | Expr | The input Ibis expression that provides data to the UDXF. This expression’s output will be streamed to the Flight server for processing. | required |
process_df | callable | A function that takes a pandas DataFrame as input and returns a transformed pandas DataFrame. This function defines the core transformation logic that will be executed on the Flight server. The function signature should be: process_df(df: pd.DataFrame) -> pd.DataFrame |
required |
maybe_schema_in | Schema or callable | Input schema specification. Can be either: - A pyarrow Schema object defining the expected input schema - A callable that validates the input schema and returns True/False Used to validate that the input expression’s schema matches expectations. | required |
maybe_schema_out | Schema or callable | Output schema specification. Can be either: - A pyarrow Schema object defining the expected output schema - A callable that computes the output schema from the input schema Used to determine the schema of the transformed data. | required |
name | str | Name for the resulting table in the target backend. If not provided, a unique name will be generated automatically. | None |
make_server | callable | Factory function for creating the Arrow Flight server. Defaults to creating an mTLS-enabled FlightServer with client verification. The function should return a FlightServer instance. | None |
make_connection | callable | Factory function for creating connections to backends. Defaults to xo.connect . Used for establishing connections during Flight operations. |
None |
con | Backend | Target backend connection where the result will be materialized. If not provided, uses the backend from the input expression. | None |
inner_name | str | Internal name for the FlightUDXF operation. If not provided, a unique name will be generated. | None |
make_udxf_kwargs | tuple | Additional keyword arguments to pass to the UDXF creation process. Should be a tuple of (key, value) pairs that will be converted to a dictionary and passed to make_udxf . |
() |
**kwargs | dict | Additional keyword arguments passed to the FlightUDXF constructor. | {} |
Returns
Name | Type | Description |
---|---|---|
Expr | A Xorq expression representing the transformed data. This expression can be further chained with other operations or executed to materialize the results. |
Examples
Basic sentiment analysis:
>>> @curry
>>> def add_sentiment(df: pd.DataFrame, input_col, output_col):
# Simplified sentiment analysis
... = df[input_col].apply(lambda x: "POSITIVE" if "good" in x.lower() else "NEGATIVE")
... sentiments return df.assign(**{output_col: sentiments}) ...
>>> # Define schemas
>>> schema_in = xo.schema({"text": "string"})
>>> schema_out = xo.schema({"text": "string", "sentiment": "string"})
>>> # Create the UDXF
>>> sentiment_udxf = xo.expr.relations.flight_udxf(
=add_sentiment(input_col="text", output_col="sentiment"),
... process_df=schema_in,
... maybe_schema_in=schema_out,
... maybe_schema_out="SentimentAnalyzer"
... name ... )
>>> # Apply to data
>>> data = xo.memtable({"text": ["This is good", "This is bad"]})
>>> result = data.pipe(sentiment_udxf).execute()
Data fetching and processing:
>>> @curry
>>> def fetch_external_data(df, api_endpoint):
# Fetch additional data for each row
... = []
... results for _, row in df.iterrows():
... # Simulate API call
... = {"id": row["id"], "enriched": f"data_for_{row['id']}"}
... enriched_data
... results.append(enriched_data)return pd.DataFrame(results) ...
>>> fetch_udxf = xo.expr.relations.flight_udxf(
=fetch_external_data(api_endpoint="https://api.example.com"),
... process_df=xo.schema({"id": "int64"}).to_pyarrow(),
... maybe_schema_in=xo.schema({"id": "int64", "enriched": "string"}).to_pyarrow(),
... maybe_schema_out="DataEnricher"
... name# quartodoc: +SKIP ... )
Notes
- The function uses Apache Arrow Flight for efficient data transfer between the client and server processes
- By default, the Flight server uses mTLS (mutual TLS) for secure communication
- The process_df function is executed in a separate process/server, enabling distributed processing and isolation
- Schema validation ensures type safety and prevents runtime errors
- The function is curried using toolz.curry, allowing partial application