Serve

The xorq serve command is designed to serve expressions that contain UDXF (User-Defined Exchange Functions) nodes via Apache Arrow Flight.

What the serve Command Does

When you run xorq serve, the following happens internally:

  1. Loads the built expression from the specified directory
  2. Detects UDXF nodes within the expression
  3. Creates a FlightServer using FlightServer.from_udxf()
  4. Starts the server and blocks until stopped

The serve command is essentially a wrapper that makes UDXF-containing expressions available as network services.

Example: Penguins Health Analysis

Let’s use the same penguins health analysis example from the build and run documentation to demonstrate how the serve command works.

Step 1: The penguins_analysis.py Script

We’ll use the same penguins_analysis.py script as in the previous examples:

# penguins_analysis.py
import pandas as pd
import xorq as xo

def analyze_penguin_health(df: pd.DataFrame) -> pd.DataFrame:
    """
    Analyze penguin health based on body mass and bill dimensions.
    This function demonstrates a typical ML preprocessing pipeline.
    """
    def calculate_health_score(row):
        if pd.isna(row['body_mass_g']) or pd.isna(row['bill_length_mm']):
            return None
        
        # Simple health score based on mass-to-bill ratio
        mass_score = min(row['body_mass_g'] / 1000, 10)  # Scale to 0-10
        bill_score = min(row['bill_length_mm'] / 10, 10)  # Scale to 0-10
        return (mass_score + bill_score) / 2
    
    def categorize_size(mass):
        if pd.isna(mass):
            return "unknown"
        elif mass < 3500:
            return "small"
        elif mass < 4500:
            return "medium" 
        else:
            return "large"
    
    return df.assign(
        health_score=df.apply(calculate_health_score, axis=1),
        size_category=df['body_mass_g'].apply(categorize_size),
        processed_at=pd.Timestamp.now()
    )

# Load penguins data
pandas_con = xo.pandas.connect()
penguins_data = xo.examples.penguins.fetch(backend=pandas_con)

# Filter out rows with missing critical values
filtered_penguins = penguins_data.filter(
    penguins_data.bill_length_mm.isnull() == False,
    penguins_data.bill_depth_mm.isnull() == False,
    penguins_data.flipper_length_mm.isnull() == False,
    penguins_data.body_mass_g.isnull() == False,
)

# Define schemas for the UDXF
input_schema = xo.schema({
    "species": str,
    "island": str,
    "bill_length_mm": float,
    "bill_depth_mm": float,
    "flipper_length_mm": float,
    "body_mass_g": float,
    "sex": str,
    "year": int
})

output_schema = xo.schema({
    "species": str,
    "island": str,
    "bill_length_mm": float,
    "bill_depth_mm": float,
    "flipper_length_mm": float,
    "body_mass_g": float,
    "sex": str,
    "year": int,
    "health_score": float,
    "size_category": str,
    "processed_at": "timestamp"
})

# Create the UDXF expression - this is what gets served
expr = xo.expr.relations.flight_udxf(
    filtered_penguins,
    process_df=analyze_penguin_health,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema,
    con=xo.connect(),
    make_udxf_kwargs={
        "name": "analyze_penguin_health",
        "command": "penguin_health_analyzer"
    }
)

Step 2: Build the Expression

# Build the expression
xorq build penguins_analysis.py --expr-name expr

This creates a builds/f02d28198715/ directory with the serialized expression.

Step 3: Serve the Expression

# Serve the built expression
xorq serve builds/f02d28198715 --host 0.0.0.0 --port 8080

Key Points About UDXF Serving

The serve command is specifically designed for UDXF expressions because:

  1. UDXF expressions contain user-defined functions that can be efficiently served over Flight
  2. FlightServer.from_udxf() automatically extracts the UDXF command (“penguin_classifier” in our example)
  3. The server hosts the function and makes it available for remote execution
  4. Clients can send data and get processed results back via the Flight protocol

Testing the Served UDXF

Once your penguin health analysis server is running, you can test it from another Python process:

import pandas as pd

import xorq as xo
from xorq.flight import FlightUrl

# Connect to the running server
url = FlightUrl(port=7817, scheme="grpc+tls", bound=False)
con = xo.flight.connect(url)

# Create test penguin data
test_data = pd.DataFrame({
    "species": ["Adelie", "Chinstrap", "Gentoo"],
    "island": ["Torgersen", "Dream", "Biscoe"],
    "bill_length_mm": [39.1, 48.7, 46.1],
    "bill_depth_mm": [18.7, 15.3, 13.2],
    "flipper_length_mm": [181.0, 196.0, 211.0],
    "body_mass_g": [3750.0, 3800.0, 4500.0],
    "sex": ["MALE", "FEMALE", "MALE"],
    "year": [2007, 2008, 2009]
})

# Convert to xorq table
test_table = xo.memtable(test_data)

udxf = con.get_flight_udxf("penguin_health_analyzer")

# Get the results
results = test_table.pipe(udxf).execute()
print(results[["species", "body_mass_g", "health_score", "size_category", "processed_at"]])

Expected output:

     species  body_mass_g  ...  size_category               processed_at
0     Adelie       3750.0  ...         medium 2025-07-14 17:32:22.802505
1  Chinstrap       3800.0  ...         medium 2025-07-14 17:32:22.802505
2     Gentoo       4500.0  ...          large 2025-07-14 17:32:22.802505

Command Line Options

The serve command supports several options:

xorq serve builds/penguins_analysis \
    --host 0.0.0.0 \           # Bind to all interfaces
    --port 8080 \              # Specific port (default: random)
    --duckdb-path ./data.db \  # Persistent database file
    --prometheus-port 9090     # Metrics endpoint (if available)

Summary

The xorq serve command is a specialized tool for serving UDXF expressions. It:

  • Only works with expressions containing UDXF nodes
  • Uses FlightServer.from_udxf() internally for optimized serving
  • Automatically extracts UDXF commands from the expression
  • Provides a simple CLI interface for deploying user-defined functions as services

This makes it easy to take any data processing function (like our penguin health analyzer) and deploy it as a high-performance network service that can be called from any Arrow Flight client.