# penguins_analysis.py
import pandas as pd
import xorq as xo
def analyze_penguin_health(df: pd.DataFrame) -> pd.DataFrame:
"""
Analyze penguin health based on body mass and bill dimensions.
This function demonstrates a typical ML preprocessing pipeline.
"""
def calculate_health_score(row):
if pd.isna(row['body_mass_g']) or pd.isna(row['bill_length_mm']):
return None
# Simple health score based on mass-to-bill ratio
= min(row['body_mass_g'] / 1000, 10) # Scale to 0-10
mass_score = min(row['bill_length_mm'] / 10, 10) # Scale to 0-10
bill_score return (mass_score + bill_score) / 2
def categorize_size(mass):
if pd.isna(mass):
return "unknown"
elif mass < 3500:
return "small"
elif mass < 4500:
return "medium"
else:
return "large"
return df.assign(
=df.apply(calculate_health_score, axis=1),
health_score=df['body_mass_g'].apply(categorize_size),
size_category=pd.Timestamp.now()
processed_at
)
# Load penguins data
= xo.pandas.connect()
pandas_con = xo.examples.penguins.fetch(backend=pandas_con)
penguins_data
# Filter out rows with missing critical values
= penguins_data.filter(
filtered_penguins == False,
penguins_data.bill_length_mm.isnull() == False,
penguins_data.bill_depth_mm.isnull() == False,
penguins_data.flipper_length_mm.isnull() == False,
penguins_data.body_mass_g.isnull()
)
# Define schemas for the UDXF
= xo.schema({
input_schema "species": str,
"island": str,
"bill_length_mm": float,
"bill_depth_mm": float,
"flipper_length_mm": float,
"body_mass_g": float,
"sex": str,
"year": int
})
= xo.schema({
output_schema "species": str,
"island": str,
"bill_length_mm": float,
"bill_depth_mm": float,
"flipper_length_mm": float,
"body_mass_g": float,
"sex": str,
"year": int,
"health_score": float,
"size_category": str,
"processed_at": "timestamp"
})
# Create the UDXF expression - this is what gets served
= xo.expr.relations.flight_udxf(
expr
filtered_penguins,=analyze_penguin_health,
process_df=input_schema,
maybe_schema_in=output_schema,
maybe_schema_out=xo.connect(),
con={
make_udxf_kwargs"name": "analyze_penguin_health",
"command": "penguin_health_analyzer"
} )
Serve
The xorq serve
command is designed to serve expressions that contain UDXF (User-Defined Exchange Functions) nodes via Apache Arrow Flight.
What the serve Command Does
When you run xorq serve
, the following happens internally:
- Loads the built expression from the specified directory
- Detects UDXF nodes within the expression
- Creates a FlightServer using
FlightServer.from_udxf()
- Starts the server and blocks until stopped
The serve command is essentially a wrapper that makes UDXF-containing expressions available as network services.
Example: Penguins Health Analysis
Let’s use the same penguins health analysis example from the build and run documentation to demonstrate how the serve command works.
Step 1: The penguins_analysis.py Script
We’ll use the same penguins_analysis.py
script as in the previous examples:
Step 2: Build the Expression
# Build the expression
xorq build penguins_analysis.py --expr-name expr
This creates a builds/f02d28198715/
directory with the serialized expression.
Step 3: Serve the Expression
# Serve the built expression
xorq serve builds/f02d28198715 --host 0.0.0.0 --port 8080
Key Points About UDXF Serving
The serve command is specifically designed for UDXF expressions because:
- UDXF expressions contain user-defined functions that can be efficiently served over Flight
- FlightServer.from_udxf() automatically extracts the UDXF command (“penguin_classifier” in our example)
- The server hosts the function and makes it available for remote execution
- Clients can send data and get processed results back via the Flight protocol
Testing the Served UDXF
Once your penguin health analysis server is running, you can test it from another Python process:
import pandas as pd
import xorq as xo
from xorq.flight import FlightUrl
# Connect to the running server
= FlightUrl(port=7817, scheme="grpc+tls", bound=False)
url = xo.flight.connect(url)
con
# Create test penguin data
= pd.DataFrame({
test_data "species": ["Adelie", "Chinstrap", "Gentoo"],
"island": ["Torgersen", "Dream", "Biscoe"],
"bill_length_mm": [39.1, 48.7, 46.1],
"bill_depth_mm": [18.7, 15.3, 13.2],
"flipper_length_mm": [181.0, 196.0, 211.0],
"body_mass_g": [3750.0, 3800.0, 4500.0],
"sex": ["MALE", "FEMALE", "MALE"],
"year": [2007, 2008, 2009]
})
# Convert to xorq table
= xo.memtable(test_data)
test_table
= con.get_flight_udxf("penguin_health_analyzer")
udxf
# Get the results
= test_table.pipe(udxf).execute()
results print(results[["species", "body_mass_g", "health_score", "size_category", "processed_at"]])
Expected output:
species body_mass_g ... size_category processed_at
0 Adelie 3750.0 ... medium 2025-07-14 17:32:22.802505
1 Chinstrap 3800.0 ... medium 2025-07-14 17:32:22.802505
2 Gentoo 4500.0 ... large 2025-07-14 17:32:22.802505
Command Line Options
The serve command supports several options:
xorq serve builds/penguins_analysis \
--host 0.0.0.0 \ # Bind to all interfaces
--port 8080 \ # Specific port (default: random)
--duckdb-path ./data.db \ # Persistent database file
--prometheus-port 9090 # Metrics endpoint (if available)
Summary
The xorq serve
command is a specialized tool for serving UDXF expressions. It:
- Only works with expressions containing UDXF nodes
- Uses FlightServer.from_udxf() internally for optimized serving
- Automatically extracts UDXF commands from the expression
- Provides a simple CLI interface for deploying user-defined functions as services
This makes it easy to take any data processing function (like our penguin health analyzer) and deploy it as a high-performance network service that can be called from any Arrow Flight client.