This guide demonstrates how to effectively use Xorq’s pipeline operations to switch between different backends, cache results for optimal performance, and leverage ParquetStorage for persistent caching. We’ll use the penguins dataset and work with the default backend, DuckDB, and PostgreSQL.
Introduction
Xorq’s multi-engine architecture allows you to:
Switch seamlessly between different data backends (DuckDB, PostgreSQL, DataFusion, etc.)
Cache intermediate results to avoid recomputation
Persist data using various storage mechanisms
Let’s explore these capabilities using practical examples.
Setup and Initial Connections
First, let’s set up our environment and create connections to different backends:
Backend connections established:
Xorq Backend: <xorq.backends.let.Backend object at 0x7f4320427f80>
DuckDB Backend: <xorq.backends.duckdb.Backend object at 0x7f42e5a52f90>
PostgreSQL Backend: <xorq.backends.postgres.Backend object at 0x7f42e60b3050>
Loading the Penguins Dataset
Let’s start by loading the famous penguins dataset and exploring its structure:
# Load penguins dataset using Xorq's examples modulepenguins_expr = xo.examples.penguins.fetch(backend=con)# Display basic information about the datasetprint("Penguins Dataset Schema:")print(penguins_expr.schema())print(f"\nDataset shape: {penguins_expr.count().execute()} rows")print(f"Current backends: {penguins_expr.ls.backends}")# Preview the dataprint("\nFirst 5 rows:")penguins_expr.head().execute()
Penguins Dataset Schema:
ibis.Schema {
species string
island string
bill_length_mm float64
bill_depth_mm float64
flipper_length_mm float64
body_mass_g float64
sex string
year int64
}
Dataset shape: 344 rows
Current backends: (<xorq.backends.let.Backend object at 0x7f4320427f80>,)
First 5 rows:
species
island
bill_length_mm
bill_depth_mm
flipper_length_mm
body_mass_g
sex
year
0
Adelie
Torgersen
39.1
18.7
181.0
3750.0
male
2007
1
Adelie
Torgersen
39.5
17.4
186.0
3800.0
female
2007
2
Adelie
Torgersen
40.3
18.0
195.0
3250.0
female
2007
3
Adelie
Torgersen
NaN
NaN
NaN
NaN
None
2007
4
Adelie
Torgersen
36.7
19.3
193.0
3450.0
female
2007
Data Preprocessing
Before we demonstrate backend switching, let’s clean our data by removing rows with missing values:
# Clean the dataset by filtering out null valuesclean_penguins = penguins_expr.filter( _.bill_length_mm.notnull(), _.bill_depth_mm.notnull(), _.flipper_length_mm.notnull(), _.body_mass_g.notnull(), _.sex.notnull())print(f"Cleaned dataset: {clean_penguins.count().execute()} rows")print("Dataset is now ready for multi-backend operations!")
Cleaned dataset: 333 rows
Dataset is now ready for multi-backend operations!
Backend Switching with into_backend()
Xorq’s powerful into_backend() method allows seamless data movement between different query engines using Apache Arrow as an intermediate format.
Moving Data to DuckDB
Let’s move our cleaned penguins data to DuckDB for processing:
# Move data from Xorq backend to DuckDBpenguins_ddb = clean_penguins.into_backend(ddb, "penguins_clean")print("Data successfully moved to DuckDB!")print(f"All the expression backends: {penguins_ddb.ls.backends}")# Perform DuckDB-specific aggregationsspecies_stats_ddb = ( penguins_ddb .group_by(['species', 'sex']) .agg([ _.count().name('count'), _.bill_length_mm.mean().name('avg_bill_length'), _.bill_depth_mm.mean().name('avg_bill_depth'), _.flipper_length_mm.mean().name('avg_flipper_length'), _.body_mass_g.mean().name('avg_body_mass') ]) .order_by(['species', 'sex']))print("\nSpecies Statistics (computed in DuckDB):")species_stats_ddb.execute()
Data successfully moved to DuckDB!
All the expression backends: (<xorq.backends.duckdb.Backend object at 0x7f42e5a52f90>, <xorq.backends.let.Backend object at 0x7f4320427f80>)
Species Statistics (computed in DuckDB):
species
sex
count
avg_bill_length
avg_bill_depth
avg_flipper_length
avg_body_mass
0
Adelie
female
73
37.257534
17.621918
187.794521
3368.835616
1
Adelie
male
73
40.390411
19.072603
192.410959
4043.493151
2
Chinstrap
female
34
46.573529
17.588235
191.735294
3527.205882
3
Chinstrap
male
34
51.094118
19.252941
199.911765
3938.970588
4
Gentoo
female
58
45.563793
14.237931
212.706897
4679.741379
5
Gentoo
male
61
49.473770
15.718033
221.540984
5484.836066
Moving Data to PostgreSQL
Now let’s demonstrate moving data to PostgreSQL for additional processing:
# Move the aggregated results to PostgreSQL for further analysisspecies_stats_pg = species_stats_ddb.into_backend(pg, "species_statistics")print("Aggregated data moved to PostgreSQL!")print(f"PostgreSQL backends: {species_stats_pg.ls.backends}")# Perform additional calculations in PostgreSQLspecies_analysis = ( species_stats_pg .mutate( bill_ratio=_.avg_bill_length / _.avg_bill_depth, body_mass_kg=_.avg_body_mass /1000, ) .select(['species', 'sex', 'count','bill_ratio', 'body_mass_kg' ]))print("\nSpecies Analysis (computed in PostgreSQL):")species_analysis.execute()
Aggregated data moved to PostgreSQL!
PostgreSQL backends: (<xorq.backends.postgres.Backend object at 0x7f42e60b3050>, <xorq.backends.duckdb.Backend object at 0x7f42e5a52f90>, <xorq.backends.let.Backend object at 0x7f4320427f80>)
Species Analysis (computed in PostgreSQL):
species
sex
count
bill_ratio
body_mass_kg
0
Adelie
female
73
2.114272
3.368836
1
Adelie
male
73
2.117719
4.043493
2
Chinstrap
female
34
2.647993
3.527206
3
Chinstrap
male
34
2.653834
3.938971
4
Gentoo
female
58
3.200170
4.679741
5
Gentoo
male
61
3.147580
5.484836
Caching Pipeline Results
Xorq provides sophisticated caching mechanisms to optimize performance and avoid recomputation. Let’s explore different storage types.
ParquetStorage for Persistent Caching
ParquetStorage provides durable persistence by caching results as Parquet files on disk:
from pathlib import Path# Create ParquetStorage for cachingcache_storage = ParquetStorage(source=con, base_path=Path.cwd() /"cache")# Cache the cleaned penguins datacached_penguins = clean_penguins.cache(storage=cache_storage)print("Data cached using ParquetStorage!")print(f"Cache storage location: {cache_storage.base_path}")# Subsequent operations will use cached dataprint("\nFirst execution (writes to cache):")result1 = cached_penguins.count().execute()print(f"Count: {result1}")print("\nSecond execution (reads from cache):")result2 = cached_penguins.count().execute()print(f"Count: {result2}")
Data cached using ParquetStorage!
Cache storage location: /home/runner/work/xorq/xorq/docs/how_to/cache
First execution (writes to cache):
Count: 333
Second execution (reads from cache):
Count: 333
SourceStorage for Backend-Specific Caching
SourceStorage automatically invalidates cache when upstream data changes and uses the source backend for persistence:
# Create SourceStorage using DuckDB as the caching backendddb_storage = SourceStorage(source=ddb)# Cache a complex aggregation using DuckDBcomplex_analysis = ( clean_penguins .group_by('species') .agg([ _.bill_length_mm.mean().name('avg_bill_length'), _.bill_length_mm.std().name('std_bill_length'), _.bill_depth_mm.mean().name('avg_bill_depth'), _.bill_depth_mm.std().name('std_bill_depth'), _.count().name('sample_count') ]) .mutate( bill_length_cv=_.std_bill_length / _.avg_bill_length, bill_depth_cv=_.std_bill_depth / _.avg_bill_depth ) .cache(storage=ddb_storage) # Cache in DuckDB)print("Complex analysis cached in DuckDB!")complex_analysis.execute()
Complex analysis cached in DuckDB!
species
avg_bill_length
std_bill_length
avg_bill_depth
std_bill_depth
sample_count
bill_length_cv
bill_depth_cv
0
Chinstrap
48.833824
3.339256
18.420588
1.135395
68
0.068380
0.061637
1
Adelie
38.823973
2.662597
18.347260
1.219338
146
0.068581
0.066459
2
Gentoo
47.568067
3.106116
14.996639
0.985998
119
0.065298
0.065748
Chaining Caches Across Multiple Engines
One of Xorq’s powerful features is the ability to chain caches across different backends:
# Create a pipeline with multiple cache points across different backendspipeline_result = ( clean_penguins .cache(storage=ParquetStorage(con)) # Cache initial data .into_backend(ddb, "temp_penguins") # Move to DuckDB .mutate( bmi=_.body_mass_g / (_.flipper_length_mm /1000) **2, # Calculate BMI-like metric bill_size_index=_.bill_length_mm * _.bill_depth_mm ) .cache(storage=SourceStorage(ddb)) # Cache in DuckDB .into_backend(pg, "enriched_penguins") # Move to PostgreSQL .cache(storage=SourceStorage(pg)) # Final cache in PostgreSQL)print("Multi-stage pipeline with cross-backend caching created!")final_result = pipeline_result.execute()print(f"Final pipeline result: {len(final_result)} rows")
Multi-stage pipeline with cross-backend caching created!
Final pipeline result: 333 rows
Xorq’s pipeline operations provide the foundation for building robust, scalable data processing workflows that can efficiently utilize multiple backends while maintaining excellent performance through intelligent caching.