flight_udxf

flight_udxf(
    expr,
    process_df,
    maybe_schema_in,
    maybe_schema_out,
    name=None,
    make_server=None,
    make_connection=None,
    con=None,
    inner_name=None,
    make_udxf_kwargs=(),
    **kwargs,
)

Create a User-Defined Exchange Function (UDXF) that executes a pandas DataFrame transformation via Apache Arrow Flight protocol.

This function wraps a pandas-based data processing function in an Arrow Flight ephemeral server, enabling distributed execution of custom user-defined functions. The function creates a FlightUDXF operation that can be integrated into xorq expression pipelines for scalable data processing.

Parameters

Name Type Description Default
expr Expr The input Ibis expression that provides data to the UDXF. This expression’s output will be streamed to the Flight server for processing. required
process_df callable A function that takes a pandas DataFrame as input and returns a transformed pandas DataFrame. This function defines the core transformation logic that will be executed on the Flight server. The function signature should be: process_df(df: pd.DataFrame) -> pd.DataFrame required
maybe_schema_in Schema or callable Input schema specification. Can be either: - A pyarrow Schema object defining the expected input schema - A callable that validates the input schema and returns True/False Used to validate that the input expression’s schema matches expectations. required
maybe_schema_out Schema or callable Output schema specification. Can be either: - A pyarrow Schema object defining the expected output schema - A callable that computes the output schema from the input schema Used to determine the schema of the transformed data. required
name str Name for the resulting table in the target backend. If not provided, a unique name will be generated automatically. None
make_server callable Factory function for creating the Arrow Flight server. Defaults to creating an mTLS-enabled FlightServer with client verification. The function should return a FlightServer instance. None
make_connection callable Factory function for creating connections to backends. Defaults to xo.connect. Used for establishing connections during Flight operations. None
con Backend Target backend connection where the result will be materialized. If not provided, uses the backend from the input expression. None
inner_name str Internal name for the FlightUDXF operation. If not provided, a unique name will be generated. None
make_udxf_kwargs tuple Additional keyword arguments to pass to the UDXF creation process. Should be a tuple of (key, value) pairs that will be converted to a dictionary and passed to make_udxf. ()
**kwargs dict Additional keyword arguments passed to the FlightUDXF constructor. {}

Returns

Name Type Description
Expr A Xorq expression representing the transformed data. This expression can be further chained with other operations or executed to materialize the results.

Examples

Basic sentiment analysis:

>>> import pandas as pd
>>> import xorq as xo
>>> from xorq.common.utils.toolz_utils import curry
>>> @curry
>>> def add_sentiment(df: pd.DataFrame, input_col, output_col):
...     # Simplified sentiment analysis
...     sentiments = df[input_col].apply(lambda x: "POSITIVE" if "good" in x.lower() else "NEGATIVE")
...     return df.assign(**{output_col: sentiments})
>>> # Define schemas
>>> schema_in = xo.schema({"text": "string"})
>>> schema_out = xo.schema({"text": "string", "sentiment": "string"})
>>> # Create the UDXF
>>> sentiment_udxf = xo.expr.relations.flight_udxf(
...     process_df=add_sentiment(input_col="text", output_col="sentiment"),
...     maybe_schema_in=schema_in,
...     maybe_schema_out=schema_out,
...     name="SentimentAnalyzer"
... )
>>> # Apply to data
>>> data = xo.memtable({"text": ["This is good", "This is bad"]})
>>> result = data.pipe(sentiment_udxf).execute()

Data fetching and processing:

>>> @curry
>>> def fetch_external_data(df, api_endpoint):
...     # Fetch additional data for each row
...     results = []
...     for _, row in df.iterrows():
...         # Simulate API call
...         enriched_data = {"id": row["id"], "enriched": f"data_for_{row['id']}"}
...         results.append(enriched_data)
...     return pd.DataFrame(results)
>>> fetch_udxf = xo.expr.relations.flight_udxf(
...     process_df=fetch_external_data(api_endpoint="https://api.example.com"),
...     maybe_schema_in=xo.schema({"id": "int64"}).to_pyarrow(),
...     maybe_schema_out=xo.schema({"id": "int64", "enriched": "string"}).to_pyarrow(),
...     name="DataEnricher"
... ) # quartodoc: +SKIP

Notes

  • The function uses Apache Arrow Flight for efficient data transfer between the client and server processes
  • By default, the Flight server uses mTLS (mutual TLS) for secure communication
  • The process_df function is executed in a separate process/server, enabling distributed processing and isolation
  • Schema validation ensures type safety and prevents runtime errors
  • The function is curried using toolz.curry, allowing partial application