Backend Switching and Caching Guide

This guide demonstrates how to effectively use Xorq’s pipeline operations to switch between different backends, cache results for optimal performance, and leverage ParquetStorage for persistent caching. We’ll use the penguins dataset and work with the default backend, DuckDB, and PostgreSQL.

Introduction

Xorq’s multi-engine architecture allows you to:

  • Switch seamlessly between different data backends (DuckDB, PostgreSQL, DataFusion, etc.)
  • Cache intermediate results to avoid recomputation
  • Persist data using various storage mechanisms

Let’s explore these capabilities using practical examples.

Setup and Initial Connections

First, let’s set up our environment and create connections to different backends:

import xorq as xo
from xorq import _  # Column reference accessor
from xorq.caching import ParquetStorage, SourceStorage
from pathlib import Path
import pandas as pd

# Create connections to different backends
con = xo.connect()  # Xorq's main backend (DataFusion-based)
ddb = xo.duckdb.connect()  # DuckDB connection
pg = xo.postgres.connect_env()  # PostgreSQL connection (requires environment setup)

print("Backend connections established:")
print(f"Xorq Backend: {con}")
print(f"DuckDB Backend: {ddb}")
print(f"PostgreSQL Backend: {pg}")
Backend connections established:
Xorq Backend: <xorq.backends.let.Backend object at 0x7f4320427f80>
DuckDB Backend: <xorq.backends.duckdb.Backend object at 0x7f42e5a52f90>
PostgreSQL Backend: <xorq.backends.postgres.Backend object at 0x7f42e60b3050>

Loading the Penguins Dataset

Let’s start by loading the famous penguins dataset and exploring its structure:

# Load penguins dataset using Xorq's examples module
penguins_expr = xo.examples.penguins.fetch(backend=con)

# Display basic information about the dataset
print("Penguins Dataset Schema:")
print(penguins_expr.schema())
print(f"\nDataset shape: {penguins_expr.count().execute()} rows")
print(f"Current backends: {penguins_expr.ls.backends}")

# Preview the data
print("\nFirst 5 rows:")
penguins_expr.head().execute()
Penguins Dataset Schema:
ibis.Schema {
  species            string
  island             string
  bill_length_mm     float64
  bill_depth_mm      float64
  flipper_length_mm  float64
  body_mass_g        float64
  sex                string
  year               int64
}

Dataset shape: 344 rows
Current backends: (<xorq.backends.let.Backend object at 0x7f4320427f80>,)

First 5 rows:
species island bill_length_mm bill_depth_mm flipper_length_mm body_mass_g sex year
0 Adelie Torgersen 39.1 18.7 181.0 3750.0 male 2007
1 Adelie Torgersen 39.5 17.4 186.0 3800.0 female 2007
2 Adelie Torgersen 40.3 18.0 195.0 3250.0 female 2007
3 Adelie Torgersen NaN NaN NaN NaN None 2007
4 Adelie Torgersen 36.7 19.3 193.0 3450.0 female 2007

Data Preprocessing

Before we demonstrate backend switching, let’s clean our data by removing rows with missing values:

# Clean the dataset by filtering out null values
clean_penguins = penguins_expr.filter(
    _.bill_length_mm.notnull(),
    _.bill_depth_mm.notnull(),
    _.flipper_length_mm.notnull(),
    _.body_mass_g.notnull(),
    _.sex.notnull()
)

print(f"Cleaned dataset: {clean_penguins.count().execute()} rows")
print("Dataset is now ready for multi-backend operations!")
Cleaned dataset: 333 rows
Dataset is now ready for multi-backend operations!

Backend Switching with into_backend()

Xorq’s powerful into_backend() method allows seamless data movement between different query engines using Apache Arrow as an intermediate format.

Moving Data to DuckDB

Let’s move our cleaned penguins data to DuckDB for processing:

# Move data from Xorq backend to DuckDB
penguins_ddb = clean_penguins.into_backend(ddb, "penguins_clean")

print("Data successfully moved to DuckDB!")
print(f"All the expression backends: {penguins_ddb.ls.backends}")

# Perform DuckDB-specific aggregations
species_stats_ddb = (
    penguins_ddb
    .group_by(['species', 'sex'])
    .agg([
        _.count().name('count'),
        _.bill_length_mm.mean().name('avg_bill_length'),
        _.bill_depth_mm.mean().name('avg_bill_depth'),
        _.flipper_length_mm.mean().name('avg_flipper_length'),
        _.body_mass_g.mean().name('avg_body_mass')
    ])
    .order_by(['species', 'sex'])
)

print("\nSpecies Statistics (computed in DuckDB):")
species_stats_ddb.execute()
Data successfully moved to DuckDB!
All the expression backends: (<xorq.backends.duckdb.Backend object at 0x7f42e5a52f90>, <xorq.backends.let.Backend object at 0x7f4320427f80>)

Species Statistics (computed in DuckDB):
species sex count avg_bill_length avg_bill_depth avg_flipper_length avg_body_mass
0 Adelie female 73 37.257534 17.621918 187.794521 3368.835616
1 Adelie male 73 40.390411 19.072603 192.410959 4043.493151
2 Chinstrap female 34 46.573529 17.588235 191.735294 3527.205882
3 Chinstrap male 34 51.094118 19.252941 199.911765 3938.970588
4 Gentoo female 58 45.563793 14.237931 212.706897 4679.741379
5 Gentoo male 61 49.473770 15.718033 221.540984 5484.836066

Moving Data to PostgreSQL

Now let’s demonstrate moving data to PostgreSQL for additional processing:

# Move the aggregated results to PostgreSQL for further analysis
species_stats_pg = species_stats_ddb.into_backend(pg, "species_statistics")

print("Aggregated data moved to PostgreSQL!")
print(f"PostgreSQL backends: {species_stats_pg.ls.backends}")

# Perform additional calculations in PostgreSQL
species_analysis = (
    species_stats_pg
    .mutate(
        bill_ratio=_.avg_bill_length / _.avg_bill_depth,
        body_mass_kg=_.avg_body_mass / 1000,
    )
    .select([
        'species', 'sex', 'count',
        'bill_ratio', 'body_mass_kg'
    ])
)

print("\nSpecies Analysis (computed in PostgreSQL):")
species_analysis.execute()
Aggregated data moved to PostgreSQL!
PostgreSQL backends: (<xorq.backends.postgres.Backend object at 0x7f42e60b3050>, <xorq.backends.duckdb.Backend object at 0x7f42e5a52f90>, <xorq.backends.let.Backend object at 0x7f4320427f80>)

Species Analysis (computed in PostgreSQL):
species sex count bill_ratio body_mass_kg
0 Adelie female 73 2.114272 3.368836
1 Adelie male 73 2.117719 4.043493
2 Chinstrap female 34 2.647993 3.527206
3 Chinstrap male 34 2.653834 3.938971
4 Gentoo female 58 3.200170 4.679741
5 Gentoo male 61 3.147580 5.484836

Caching Pipeline Results

Xorq provides sophisticated caching mechanisms to optimize performance and avoid recomputation. Let’s explore different storage types.

ParquetStorage for Persistent Caching

ParquetStorage provides durable persistence by caching results as Parquet files on disk:

from pathlib import Path

# Create ParquetStorage for caching
cache_storage = ParquetStorage(source=con, base_path=Path.cwd() / "cache")

# Cache the cleaned penguins data
cached_penguins = clean_penguins.cache(storage=cache_storage)

print("Data cached using ParquetStorage!")
print(f"Cache storage location: {cache_storage.base_path}")

# Subsequent operations will use cached data
print("\nFirst execution (writes to cache):")
result1 = cached_penguins.count().execute()
print(f"Count: {result1}")

print("\nSecond execution (reads from cache):")
result2 = cached_penguins.count().execute()
print(f"Count: {result2}")
Data cached using ParquetStorage!
Cache storage location: /home/runner/work/xorq/xorq/docs/how_to/cache

First execution (writes to cache):
Count: 333

Second execution (reads from cache):
Count: 333

SourceStorage for Backend-Specific Caching

SourceStorage automatically invalidates cache when upstream data changes and uses the source backend for persistence:

# Create SourceStorage using DuckDB as the caching backend
ddb_storage = SourceStorage(source=ddb)

# Cache a complex aggregation using DuckDB
complex_analysis = (
    clean_penguins
    .group_by('species')
    .agg([
        _.bill_length_mm.mean().name('avg_bill_length'),
        _.bill_length_mm.std().name('std_bill_length'),
        _.bill_depth_mm.mean().name('avg_bill_depth'),
        _.bill_depth_mm.std().name('std_bill_depth'),
        _.count().name('sample_count')
    ])
    .mutate(
        bill_length_cv=_.std_bill_length / _.avg_bill_length,
        bill_depth_cv=_.std_bill_depth / _.avg_bill_depth
    )
    .cache(storage=ddb_storage)  # Cache in DuckDB
)

print("Complex analysis cached in DuckDB!")
complex_analysis.execute()
Complex analysis cached in DuckDB!
species avg_bill_length std_bill_length avg_bill_depth std_bill_depth sample_count bill_length_cv bill_depth_cv
0 Chinstrap 48.833824 3.339256 18.420588 1.135395 68 0.068380 0.061637
1 Adelie 38.823973 2.662597 18.347260 1.219338 146 0.068581 0.066459
2 Gentoo 47.568067 3.106116 14.996639 0.985998 119 0.065298 0.065748

Chaining Caches Across Multiple Engines

One of Xorq’s powerful features is the ability to chain caches across different backends:

# Create a pipeline with multiple cache points across different backends
pipeline_result = (
    clean_penguins
    .cache(storage=ParquetStorage(con))  # Cache initial data
    .into_backend(ddb, "temp_penguins")  # Move to DuckDB
    .mutate(
        bmi=_.body_mass_g / (_.flipper_length_mm / 1000) ** 2,  # Calculate BMI-like metric
        bill_size_index=_.bill_length_mm * _.bill_depth_mm
    )
    .cache(storage=SourceStorage(ddb))  # Cache in DuckDB
    .into_backend(pg, "enriched_penguins")  # Move to PostgreSQL
    .cache(storage=SourceStorage(pg))  # Final cache in PostgreSQL
)

print("Multi-stage pipeline with cross-backend caching created!")
final_result = pipeline_result.execute()
print(f"Final pipeline result: {len(final_result)} rows")
Multi-stage pipeline with cross-backend caching created!
Final pipeline result: 333 rows

Xorq’s pipeline operations provide the foundation for building robust, scalable data processing workflows that can efficiently utilize multiple backends while maintaining excellent performance through intelligent caching.