import xorq as xo
from xorq.caching import SourceStorage
# Connect to source database
= xo.postgres.connect_env()
pg = xo.connect() # empty connection
con
# Create source storage
= SourceStorage(source=con)
storage
# Register table from postgres and cache it
= pg.table("batting")
batting
# Cache the filtered data in the source backend
= (
cached filter(batting.yearID == 2015)
batting.=storage) # cache expression
.cache(storage
)
# Execute the query - results will be cached
= xo.execute(cached) result
Caching
The core concepts to understand caching
Caching System
xorq provides a sophisticated caching system that enables efficient iterative development of ML pipelines. The caching system allows you to:
- Cache results from upstream query engines
- Persist data locally or in remote storage
- Automatically invalidate cache when source data changes
- Chain caches across multiple engines
Storage Types
xorq supports two main types of cache storage:
1. SourceStorage
- Automatically invalidates cache when upstream data changes
- Persistence depends on the source backend
- Supports both remote (Snowflake, Postgres) and in-process (pandas, DuckDB) backends
2. SnapshotStorage
- No automatic invalidation
- Ideal for one-off analyses
- Persistence depends on source backend
3. ParquetStorage
- Special case of SourceStorage
- Caches results as Parquet files on local disk
- Uses source backend for writing and reading
- Ensures durable persistence
Hashing Strategies
Cache invalidation uses different hashing strategies based on the storage type:
Storage Type | Hash Components |
---|---|
In-Memory | Data bytes + Schema |
Disk-Based | Query plan + Schema |
Remote | Table metadata + Last modified time |