import xorq.api as xo
from xorq.api import _ # Column reference accessor
from xorq.caching import ParquetCache, SourceCache
from pathlib import Path
import pandas as pd
# Create connections to different backends
con = xo.connect() # Xorq's main backend (DataFusion-based)
ddb = xo.duckdb.connect() # DuckDB connection
pg = xo.postgres.connect_env() # PostgreSQL connection (requires environment setup)
print("Backend connections established:")
print(f"Xorq Backend: {con}")
print(f"DuckDB Backend: {ddb}")
print(f"PostgreSQL Backend: {pg}")Backend switching and caching guide
This guide demonstrates how to effectively use Xorq’s pipeline operations to switch between different backends, cache results for optimal performance, and leverage ParquetStorage for persistent caching. We’ll use the penguins dataset and work with the default backend, DuckDB, and PostgreSQL.
Introduction
Xorq’s multi-engine architecture allows you to:
- Switch seamlessly between different data backends (DuckDB, PostgreSQL, DataFusion, etc.)
- Cache intermediate results to avoid recomputation
- Persist data using various storage mechanisms
Let’s explore these capabilities using practical examples.
Setup and initial connections
First, let’s set up our environment and create connections to different backends:
Loading the penguins dataset
Let’s start by loading the famous penguins dataset and exploring its structure:
# Load penguins dataset using Xorq's examples module
penguins_expr = xo.examples.penguins.fetch(backend=con)
# Display basic information about the dataset
print("Penguins Dataset Schema:")
print(penguins_expr.schema())
print(f"\nDataset shape: {penguins_expr.count().execute()} rows")
print(f"Current backends: {penguins_expr.ls.backends}")
# Preview the data
print("\nFirst 5 rows:")
penguins_expr.head().execute()Data preprocessing
Before we demonstrate backend switching, let’s clean our data by removing rows with missing values:
# Clean the dataset by filtering out null values
clean_penguins = penguins_expr.filter(
_.bill_length_mm.notnull(),
_.bill_depth_mm.notnull(),
_.flipper_length_mm.notnull(),
_.body_mass_g.notnull(),
_.sex.notnull()
)
print(f"Cleaned dataset: {clean_penguins.count().execute()} rows")
print("Dataset is now ready for multi-backend operations!")Backend switching with into_backend()
Xorq’s powerful into_backend() method allows seamless data movement between different query engines using Apache Arrow as an intermediate format.
Moving data to DuckDB
Let’s move our cleaned penguins data to DuckDB for processing:
# Move data from Xorq backend to DuckDB
penguins_ddb = clean_penguins.into_backend(ddb, "penguins_clean")
print("Data successfully moved to DuckDB!")
print(f"All the expression backends: {penguins_ddb.ls.backends}")
# Perform DuckDB-specific aggregations
species_stats_ddb = (
penguins_ddb
.group_by(['species', 'sex'])
.agg([
_.count().name('count'),
_.bill_length_mm.mean().name('avg_bill_length'),
_.bill_depth_mm.mean().name('avg_bill_depth'),
_.flipper_length_mm.mean().name('avg_flipper_length'),
_.body_mass_g.mean().name('avg_body_mass')
])
.order_by(['species', 'sex'])
)
print("\nSpecies Statistics (computed in DuckDB):")
species_stats_ddb.execute()Moving data to PostgreSQL
Now let’s demonstrate moving data to PostgreSQL for additional processing:
# Move the aggregated results to PostgreSQL for further analysis
species_stats_pg = species_stats_ddb.into_backend(pg, "species_statistics")
print("Aggregated data moved to PostgreSQL!")
print(f"PostgreSQL backends: {species_stats_pg.ls.backends}")
# Perform additional calculations in PostgreSQL
species_analysis = (
species_stats_pg
.mutate(
bill_ratio=_.avg_bill_length / _.avg_bill_depth,
body_mass_kg=_.avg_body_mass / 1000,
)
.select([
'species', 'sex', 'count',
'bill_ratio', 'body_mass_kg'
])
)
print("\nSpecies Analysis (computed in PostgreSQL):")
species_analysis.execute()Caching pipeline results
Xorq provides sophisticated caching mechanisms to optimize performance and avoid recomputation. Let’s explore different storage types.
ParquetStorage for persistent caching
ParquetStorage provides durable persistence by caching results as Parquet files on disk:
from pathlib import Path
# Create ParquetStorage for caching
cache = ParquetCache.from_kwargs(source=con, base_path=Path.cwd() / "cache")
# Cache the cleaned penguins data
cached_penguins = clean_penguins.cache(cache)
print("Data cached using ParquetStorage!")
print(f"Cache storage location: {cache.storage.base_path}")
# Subsequent operations will use cached data
print("\nFirst execution (writes to cache):")
result1 = cached_penguins.count().execute()
print(f"Count: {result1}")
print("\nSecond execution (reads from cache):")
result2 = cached_penguins.count().execute()
print(f"Count: {result2}")SourceStorage for backend-specific caching
SourceStorage automatically invalidates cache when upstream data changes and uses the source backend for persistence:
# Create SourceStorage using DuckDB as the caching backend
ddb_storage = SourceCache.from_kwargs(source=ddb)
# Cache a complex aggregation using DuckDB
complex_analysis = (
clean_penguins
.group_by('species')
.agg([
_.bill_length_mm.mean().name('avg_bill_length'),
_.bill_length_mm.std().name('std_bill_length'),
_.bill_depth_mm.mean().name('avg_bill_depth'),
_.bill_depth_mm.std().name('std_bill_depth'),
_.count().name('sample_count')
])
.mutate(
bill_length_cv=_.std_bill_length / _.avg_bill_length,
bill_depth_cv=_.std_bill_depth / _.avg_bill_depth
)
.cache(ddb_storage) # Cache in DuckDB
)
print("Complex analysis cached in DuckDB!")
complex_analysis.execute()Chaining caches across multiple engines
One of Xorq’s powerful features is the ability to chain caches across different backends:
# Create a pipeline with multiple cache points across different backends
pipeline_result = (
clean_penguins
.cache(ParquetCache.from_kwargs(source=con)) # Cache initial data
.into_backend(ddb, "temp_penguins") # Move to DuckDB
.mutate(
bmi=_.body_mass_g / (_.flipper_length_mm / 1000) ** 2, # Calculate BMI-like metric
bill_size_index=_.bill_length_mm * _.bill_depth_mm
)
.cache(SourceCache.from_kwargs(source=ddb)) # Cache in DuckDB
.into_backend(pg, "enriched_penguins") # Move to PostgreSQL
.cache(SourceCache.from_kwargs(source=pg)) # Final cache in PostgreSQL
)
print("Multi-stage pipeline with cross-backend caching created!")
final_result = pipeline_result.execute()
print(f"Final pipeline result: {len(final_result)} rows")Xorq’s pipeline operations provide the foundation for building robust, scalable data processing workflows that can efficiently utilize multiple backends while maintaining excellent performance through intelligent caching.