10-minute tour of Xorq

A quick tour of Xorq’s core features. This tutorial covers multi-engine operations, caching, UDXF nodes and Profiles.

Installation

pip install 'xorq[duckdb,examples]'

Working with Data

Xorq leverage Ibis for working with data, for example:

import xorq as xo 
from xorq import _

penguins = xo.examples.penguins.fetch(backend=xo.pandas.connect())

# Build a query - nothing executes yet
clean_penguins = (
    penguins
    .filter(
        _.bill_length_mm.isnull() == False,
        _.sex.isnull() == False
    )
    .select(['species', 'island', 'sex', 'bill_length_mm', 'body_mass_g'])
    .mutate(
        body_mass_kg=_.body_mass_g / 1000
    )
)

# Execute to see results
clean_penguins.execute()
species island sex bill_length_mm body_mass_g body_mass_kg
0 Adelie Torgersen male 39.1 3750.0 3.750
1 Adelie Torgersen female 39.5 3800.0 3.800
2 Adelie Torgersen female 40.3 3250.0 3.250
3 Adelie Torgersen female 36.7 3450.0 3.450
4 Adelie Torgersen male 39.3 3650.0 3.650
... ... ... ... ... ... ...
328 Chinstrap Dream male 55.8 4000.0 4.000
329 Chinstrap Dream female 43.5 3400.0 3.400
330 Chinstrap Dream male 49.6 3775.0 3.775
331 Chinstrap Dream male 50.8 4100.0 4.100
332 Chinstrap Dream female 50.2 3775.0 3.775

333 rows × 6 columns

For more info on data operations and transformations with Ibis, check the Ibis docs.

Multi-Engine Operations

A core feature of Xorq is the capability of moving data between backends use into_backend():

# Move pandas data to DuckDB
ddb = xo.duckdb.connect()
ddb_penguins = clean_penguins.into_backend(ddb, "penguins_clean")

# Now use DuckDB for analytics
species_stats = (
    ddb_penguins
    .group_by(['species', 'island'])
    .agg([
        _.bill_length_mm.mean().name('avg_bill_length'),
        _.body_mass_g.mean().name('avg_body_mass'),
        _.count().name('penguin_count')
    ])
)

# Join across the same backend
enriched = (
    ddb_penguins
    .join(species_stats, ['species', 'island'], how='left')
    .mutate(
        bill_vs_avg=_.bill_length_mm - _.avg_bill_length,
        mass_vs_avg=_.body_mass_g - _.avg_body_mass
    )
)

enriched.execute()
species island sex bill_length_mm body_mass_g body_mass_kg species_right island_right avg_bill_length avg_body_mass penguin_count bill_vs_avg mass_vs_avg
0 Adelie Torgersen male 39.1 3750.0 3.750 Adelie Torgersen 39.038298 3708.510638 47 0.061702 41.489362
1 Adelie Torgersen female 39.5 3800.0 3.800 Adelie Torgersen 39.038298 3708.510638 47 0.461702 91.489362
2 Adelie Torgersen female 40.3 3250.0 3.250 Adelie Torgersen 39.038298 3708.510638 47 1.261702 -458.510638
3 Adelie Torgersen female 36.7 3450.0 3.450 Adelie Torgersen 39.038298 3708.510638 47 -2.338298 -258.510638
4 Adelie Torgersen male 39.3 3650.0 3.650 Adelie Torgersen 39.038298 3708.510638 47 0.261702 -58.510638
... ... ... ... ... ... ... ... ... ... ... ... ... ...
328 Chinstrap Dream male 55.8 4000.0 4.000 Chinstrap Dream 48.833824 3733.088235 68 6.966176 266.911765
329 Chinstrap Dream female 43.5 3400.0 3.400 Chinstrap Dream 48.833824 3733.088235 68 -5.333824 -333.088235
330 Chinstrap Dream male 49.6 3775.0 3.775 Chinstrap Dream 48.833824 3733.088235 68 0.766176 41.911765
331 Chinstrap Dream male 50.8 4100.0 4.100 Chinstrap Dream 48.833824 3733.088235 68 1.966176 366.911765
332 Chinstrap Dream female 50.2 3775.0 3.775 Chinstrap Dream 48.833824 3733.088235 68 1.366176 41.911765

333 rows × 13 columns

Caching

Xorq provides a caching mechanism that can write to Parquet or a source (DuckDB, Postgres) to avoid re-computation:

from pathlib import Path
from xorq.caching import ParquetStorage

con = xo.connect()

# Set up caching
cache_storage = ParquetStorage(source=con, base_path=Path.cwd())

# Cache the result
cached_enriched = enriched.cache(storage=cache_storage)

# First execution computes and caches
result1 = cached_enriched.execute()

# Second execution uses cache
result2 = cached_enriched.execute()  # Much faster

UDXF (User-Defined Exchange Functions)

UDXF are functions that spin an ephemeral Flight Server for the execution through a DoExchange:

import pandas as pd

import xorq as xo

penguins = xo.examples.penguins.fetch(deferred=False)


def standardize_penguins(df: pd.DataFrame) -> pd.DataFrame:
    """
    Return a new DataFrame with penguin measurements standardized.
    """
    return df.dropna(
        subset=["bill_length_mm", "bill_depth_mm", "body_mass_g"], ignore_index=True
    ).assign(
        **{
            "bill_length_std": lambda t: t["bill_length_mm"]
            .sub(t["bill_length_mm"].mean())
            .div(t["bill_length_mm"].std()),
            "body_mass_std": lambda t: t["body_mass_g"]
            .sub(t["body_mass_g"].mean())
            .div(t["body_mass_g"].std()),
        }
    )


expr = penguins.select(
    "bill_length_mm", "bill_depth_mm", "flipper_length_mm", "body_mass_g"
)


my_udxf = xo.expr.relations.flight_udxf(
    process_df=standardize_penguins,
    maybe_schema_in=expr.schema(),
    maybe_schema_out=expr.schema()
    | xo.schema({"bill_length_std": "float", "body_mass_std": "float"}),
)

expr.pipe(my_udxf).execute()
bill_length_mm bill_depth_mm flipper_length_mm body_mass_g bill_length_std body_mass_std
0 39.1 18.7 181.0 3750.0 -0.883205 -0.563317
1 39.5 17.4 186.0 3800.0 -0.809939 -0.500969
2 40.3 18.0 195.0 3250.0 -0.663408 -1.186793
3 36.7 19.3 193.0 3450.0 -1.322799 -0.937403
4 39.3 20.6 190.0 3650.0 -0.846572 -0.688012
... ... ... ... ... ... ...
337 55.8 19.8 207.0 4000.0 2.175637 -0.251578
338 43.5 18.1 202.0 3400.0 -0.077282 -0.999750
339 49.6 18.2 193.0 3775.0 1.040019 -0.532143
340 50.8 19.0 210.0 4100.0 1.259816 -0.126883
341 50.2 18.7 198.0 3775.0 1.149917 -0.532143

342 rows × 6 columns

Profiles

For persisting connections Xorq provides the Profiles API, a Profile is a representation of a database connection that can be saved and loaded:

import xorq as xo
from xorq import _  # column references
from xorq.vendor.ibis.backends.profiles import Profile

# Create a Profile from pandas connection
pandas_profile = Profile.from_con(xo.pandas.connect())

profile = Profile(
   con_name='postgres',
   kwargs_tuple=(
            ('host', '${POSTGRES_HOST}'),
            ('port', 5432),
            ('database', 'ibis_testing'),
            ('user', '${POSTGRES_USER}'),
            ('password', '${POSTGRES_PASSWORD}'),
        )
)

path = pandas_profile.save(alias="pandas_example", clobber=True)  # clobber for overwriting

The Profiles API supports serialization to and deserialization from YAML, environment variable substitution, and security checks to prevent sensitive information from being stored in plain text.

Summary

You’ve seen:

  • Multi-engine operations for moving data between backends with into_backend()
  • Caching deferred, for store expensive computation results
  • UDXF nodes for creating reusable transformation functions
  • Profiles for managing backend connections

Next steps: Explore core concepts or learn more about UDXF nodes.