User-Defined Exchange Functions

Understanding the concept, and applications of User-Defined Exchange Functions in Xorq

What are UDXFs?

User-Defined Exchange Functions (UDXFs) are a specialized type of user-defined function in Xorq that enable distributed data processing using Apache Arrow Flight protocol. Unlike traditional UDFs that operate within a single process, UDXFs execute custom Python logic in separate processes or even remote services, making them ideal for:

  • External API integrations (calling REST APIs, databases, or third-party services)
  • Resource-intensive computations (ML model inference, heavy transformations)
  • Microservice architectures (deploying models as standalone services)
  • Process isolation (running untrusted or memory-intensive code safely)

Key Components

  1. Process Function: Your custom Python logic that transforms pandas DataFrames
  2. Schema Validation: Input/output schema specifications for type safety
  3. Flight Server: Hosts your processing function as a network service
  4. Flight Client: Transfers data to/from the server automatically

Creating UDXFs

Basic Syntax

import xorq as xo
from xorq.expr.relations import flight_udxf

# Define your processing function
@curry
def my_transform(df: pd.DataFrame, param1, param2):
    # Your custom logic here
    return transformed_df

# Create the UDXF
my_udxf = flight_udxf(
    process_df=my_transform(param1=value1, param2=value2),
    maybe_schema_in=input_schema.to_pyarrow(),
    maybe_schema_out=output_schema.to_pyarrow(),
    name="MyTransformation"
)

# Apply to data
result = input_expr.pipe(my_udxf)

Schema Specifications

UDXFs require explicit schema definitions for type safety and optimization:

# Define input requirements
input_schema = xo.schema({"text": "string", "id": "int64"})

# Define output schema  
output_schema = xo.schema({
    "text": "string", 
    "id": "int64", 
    "sentiment": "string"
})

# Schema validation functions
maybe_schema_in = schema_contains(input_schema)  # Validates required columns
maybe_schema_out = schema_concat(output_schema)  # Adds new columns