10-minute tour of Xorq

A quick tour of Xorq’s core features. This tutorial covers multi-engine operations, caching, UDXF nodes and Profiles. Xorq also ships with a CLI for quickly bootstrapping the creation, execution and serving of expressions.

Installation

pip install 'xorq[duckdb,examples]'

Working with Data

Xorq leverage Ibis for working with data, for example:

import xorq.api as xo 
from xorq.api import _

penguins = xo.examples.penguins.fetch(backend=xo.pandas.connect())

# Build a query - nothing executes yet
clean_penguins = (
    penguins
    .filter(
        _.bill_length_mm.isnull() == False,
        _.sex.isnull() == False
    )
    .select(['species', 'island', 'sex', 'bill_length_mm', 'body_mass_g'])
    .mutate(
        body_mass_kg=_.body_mass_g / 1000
    )
)

# Execute to see results
clean_penguins.execute()
species island sex bill_length_mm body_mass_g body_mass_kg
0 Adelie Torgersen male 39.1 3750.0 3.750
1 Adelie Torgersen female 39.5 3800.0 3.800
2 Adelie Torgersen female 40.3 3250.0 3.250
3 Adelie Torgersen female 36.7 3450.0 3.450
4 Adelie Torgersen male 39.3 3650.0 3.650
... ... ... ... ... ... ...
328 Chinstrap Dream male 55.8 4000.0 4.000
329 Chinstrap Dream female 43.5 3400.0 3.400
330 Chinstrap Dream male 49.6 3775.0 3.775
331 Chinstrap Dream male 50.8 4100.0 4.100
332 Chinstrap Dream female 50.2 3775.0 3.775

333 rows × 6 columns

For more info on data operations and transformations with Ibis, check the Ibis docs.

Multi-Engine Operations

A core feature of Xorq is the capability of moving data between backends use into_backend():

# Move pandas data to DuckDB
ddb = xo.duckdb.connect()
ddb_penguins = clean_penguins.into_backend(ddb, "penguins_clean")

# Now use DuckDB for analytics
species_stats = (
    ddb_penguins
    .group_by(['species', 'island'])
    .agg([
        _.bill_length_mm.mean().name('avg_bill_length'),
        _.body_mass_g.mean().name('avg_body_mass'),
        _.count().name('penguin_count')
    ])
)

# Join across the same backend
enriched = (
    ddb_penguins
    .join(species_stats, ['species', 'island'], how='left')
    .mutate(
        bill_vs_avg=_.bill_length_mm - _.avg_bill_length,
        mass_vs_avg=_.body_mass_g - _.avg_body_mass
    )
)

enriched.execute()
species island sex bill_length_mm body_mass_g body_mass_kg species_right island_right avg_bill_length avg_body_mass penguin_count bill_vs_avg mass_vs_avg
0 Adelie Torgersen male 39.1 3750.0 3.750 Adelie Torgersen 39.038298 3708.510638 47 0.061702 41.489362
1 Adelie Torgersen female 39.5 3800.0 3.800 Adelie Torgersen 39.038298 3708.510638 47 0.461702 91.489362
2 Adelie Torgersen female 40.3 3250.0 3.250 Adelie Torgersen 39.038298 3708.510638 47 1.261702 -458.510638
3 Adelie Torgersen female 36.7 3450.0 3.450 Adelie Torgersen 39.038298 3708.510638 47 -2.338298 -258.510638
4 Adelie Torgersen male 39.3 3650.0 3.650 Adelie Torgersen 39.038298 3708.510638 47 0.261702 -58.510638
... ... ... ... ... ... ... ... ... ... ... ... ... ...
328 Chinstrap Dream male 55.8 4000.0 4.000 Chinstrap Dream 48.833824 3733.088235 68 6.966176 266.911765
329 Chinstrap Dream female 43.5 3400.0 3.400 Chinstrap Dream 48.833824 3733.088235 68 -5.333824 -333.088235
330 Chinstrap Dream male 49.6 3775.0 3.775 Chinstrap Dream 48.833824 3733.088235 68 0.766176 41.911765
331 Chinstrap Dream male 50.8 4100.0 4.100 Chinstrap Dream 48.833824 3733.088235 68 1.966176 366.911765
332 Chinstrap Dream female 50.2 3775.0 3.775 Chinstrap Dream 48.833824 3733.088235 68 1.366176 41.911765

333 rows × 13 columns

Caching

Xorq provides a caching mechanism that can write to Parquet or a source (DuckDB, Postgres) to avoid re-computation:

from pathlib import Path
from xorq.caching import ParquetStorage

con = xo.connect()

# Set up caching
cache_storage = ParquetStorage(source=con, base_path=Path.cwd())

# Cache the result
cached_enriched = enriched.cache(storage=cache_storage)

# First execution computes and caches
result1 = cached_enriched.execute()

# Second execution uses cache
result2 = cached_enriched.execute()  # Much faster

UDXF (User-Defined Exchange Functions)

UDXF are functions that spin an ephemeral Flight Server for the execution through a DoExchange:

import pandas as pd

import xorq.api as xo

penguins = xo.examples.penguins.fetch(deferred=False)


def standardize_penguins(df: pd.DataFrame) -> pd.DataFrame:
    """
    Return a new DataFrame with penguin measurements standardized.
    """
    return df.dropna(
        subset=["bill_length_mm", "bill_depth_mm", "body_mass_g"], ignore_index=True
    ).assign(
        **{
            "bill_length_std": lambda t: t["bill_length_mm"]
            .sub(t["bill_length_mm"].mean())
            .div(t["bill_length_mm"].std()),
            "body_mass_std": lambda t: t["body_mass_g"]
            .sub(t["body_mass_g"].mean())
            .div(t["body_mass_g"].std()),
        }
    )


expr = penguins.select(
    "bill_length_mm", "bill_depth_mm", "flipper_length_mm", "body_mass_g"
)


my_udxf = xo.expr.relations.flight_udxf(
    process_df=standardize_penguins,
    maybe_schema_in=expr.schema(),
    maybe_schema_out=expr.schema()
    | xo.schema({"bill_length_std": "float", "body_mass_std": "float"}),
)

expr.pipe(my_udxf).execute()
bill_length_mm bill_depth_mm flipper_length_mm body_mass_g bill_length_std body_mass_std
0 39.1 18.7 181.0 3750.0 -0.883205 -0.563317
1 39.5 17.4 186.0 3800.0 -0.809939 -0.500969
2 40.3 18.0 195.0 3250.0 -0.663408 -1.186793
3 36.7 19.3 193.0 3450.0 -1.322799 -0.937403
4 39.3 20.6 190.0 3650.0 -0.846572 -0.688012
... ... ... ... ... ... ...
337 55.8 19.8 207.0 4000.0 2.175637 -0.251578
338 43.5 18.1 202.0 3400.0 -0.077282 -0.999750
339 49.6 18.2 193.0 3775.0 1.040019 -0.532143
340 50.8 19.0 210.0 4100.0 1.259816 -0.126883
341 50.2 18.7 198.0 3775.0 1.149917 -0.532143

342 rows × 6 columns

Profiles

For persisting connections Xorq provides the Profiles API, a Profile is a representation of a database connection that can be saved and loaded:

import xorq.api as xo
from xorq.api import _  # column references
from xorq.vendor.ibis.backends.profiles import Profile

# Create a Profile from pandas connection
pandas_profile = Profile.from_con(xo.pandas.connect())

profile = Profile(
   con_name='postgres',
   kwargs_tuple=(
            ('host', '${POSTGRES_HOST}'),
            ('port', 5432),
            ('database', 'ibis_testing'),
            ('user', '${POSTGRES_USER}'),
            ('password', '${POSTGRES_PASSWORD}'),
        )
)

path = pandas_profile.save(alias="pandas_example", clobber=True)  # clobber for overwriting

The Profiles API supports serialization to and deserialization from YAML, environment variable substitution, and security checks to prevent sensitive information from being stored in plain text.

Xorq CLI Commands

The xorq CLI provides powerful commands to build, run, and serve data pipelines. This section covers the essential commands you’ll use in your workflow.

Overview

Xorq separates pipeline definition from execution through its CLI:

xorq <command> [options]

Core Commands

xorq init – Initialize a Project

Quickly bootstrap a new Xorq project from templates:

# Use the Penguins ML template
xorq init -t penguins -p penguins_example

xorq build - Compile Expressions

Convert Python expressions into serialized artifacts:

# Basic build
xorq build penguins_example/expr.py

You should see output similar to the following:

Building expr from penguins_example/expr.py
Written 'expr' to builds/5704ffbeade4
builds/5704ffbeade4

This means the artifact for the variable named expr in the file expr.py was written to
builds/5704ffbeade4.

xorq run – Execute Pipelines

Run compiled expressions with various output options:

# Run and display results
xorq run builds/5704ffbeade4

The above command runs the serialized artifact and discards the result. To permanently
save the output, use:

# Save to file
xorq run builds/5704ffbeade4 -o results.parquet

xorq serve-unbound – Deploy as Flight Server

Serve your pipeline as an Arrow Flight endpoint:

# Basic server
xorq serve-unbound builds/5704ffbeade4 b2370a29c19df8e1e639c63252dacd0e

xorq serve-flight-udxf

The xorq serve-flight-udxf command serves expressions that contain UDXF (User-Defined Exchange Functions) nodes via Apache Arrow Flight.

For this command, we’ll use the following expr.py file:

import pandas as pd

import xorq.api as xo

penguins = xo.examples.penguins.fetch(deferred=False)


def standardize_penguins(df: pd.DataFrame) -> pd.DataFrame:
    """
    Return a new DataFrame with penguin measurements standardized.
    """
    return (
        df
        .dropna(subset=["bill_length_mm", "bill_depth_mm", "body_mass_g"], ignore_index=True)
        .assign(**{
            "bill_length_std": lambda t: t["bill_length_mm"].sub(t["bill_length_mm"].mean()).div(t["bill_length_mm"].std()),
            "body_mass_std": lambda t: t["body_mass_g"].sub(t["body_mass_g"].mean()).div(t["body_mass_g"].std()),
        })
    )


expr = penguins.select(
    "bill_length_mm", "bill_depth_mm", "flipper_length_mm", "body_mass_g"
)


my_udxf = xo.expr.relations.flight_udxf(
    process_df=standardize_penguins,
    maybe_schema_in=expr.schema(),
    maybe_schema_out=expr.schema() | {"bill_length_std": float, "body_mass_std": float},
)

pipeline = expr.pipe(my_udxf)


if __name__ == "__pytest_main__":
    pipeline.execute()

First, let’s rebuild the expression:

xorq build penguins_example/expr.py

This produces the following output:

Building expr from penguins_example/expr.py
Written 'expr' to builds/cb63fef71027
builds/cb63fef71027

Now we can serve the expression:

xorq serve-flight-udxf builds/cb63fef71027

Summary

You’ve seen:

  • Multi-engine operations for moving data between backends with into_backend()
  • Caching deferred, for store expensive computation results
  • UDXF nodes for creating reusable transformation functions
  • Profiles for managing backend connections
  • CLI Commands for building, running and serving expressions

Next steps: Explore core concepts or learn more about UDXF nodes.