Cache Strategies, Storage, and Backend Invalidation

Every cache in xorq is the combination of a strategy and a storage. The strategy determines how the cache key is computed. The storage determines where the cached data lives. The cache classes you import from xorq.caching are named combinations of these two axes:

Cache class Strategy Storage
SourceCache ModificationTime SourceStorage
SourceSnapshotCache Snapshot SourceStorage
ParquetCache ModificationTime ParquetStorage
ParquetSnapshotCache Snapshot ParquetStorage
ParquetTTLSnapshotCache Snapshot ParquetTTLStorage
GCSCache ModificationTime GCStorage

Strategies: how the key is computed

A cache key is a hex string that uniquely identifies a cached result. It is computed by hashing the expression via dask tokenization. The key is prefixed with letsql_cache- by default.

The two strategies differ in what goes into that hash.

ModificationTimeStrategy

The hash includes everything needed to detect that upstream data has changed. For a database table, that means backend-specific metadata:

  • Postgres: reltuples from pg_class — the estimated row count, which changes after INSERT/UPDATE/DELETE + ANALYZE
  • Snowflake: LAST_ALTERED timestamp from INFORMATION_SCHEMA.TABLES
  • BigQuery: last_modified_time from __TABLES__
  • DuckDB: data bytes for in-memory tables
  • DataFusion: data bytes for in-memory tables
  • Xorq-DataFusion: data bytes for in-memory tables
  • SQLite: COUNT(*) and MAX(id) for on-disk tables, or data bytes for in-memory
  • PyIceberg: snapshot IDs
  • deferred file reads (parquet, CSV): file inode mtime, size and number
  • deferred URL reads (S3/GCS): last_modified, size, and e_tag headers
  • deferred URL reads (HTTP): Last-Modified and Content-Length headers

The result: if the source data changes, the cache key changes, is found to not exist, and the expression re-executes. This is what “automatic invalidation” means — it isn’t a timer or a watcher, it’s a key that changes when its inputs change.

SnapshotStrategy

The key is stable even if the underlying data changes. This is useful when you only need valid data and don’t care about updates. Only high-level information like file path, table name, and schema matter.

from xorq.caching import ParquetSnapshotCache

# Caches once and never recomputes, even if the parquet file changes
expr = xo.deferred_read_parquet("sales.parquet").cache(
    ParquetSnapshotCache.from_kwargs()
)

Storage: where the data lives

SourceStorage

Stores cached results as tables inside a database backend — whichever backend you pass as source. If the source is a local, in-memory backend (DuckDB, DataFusion, Xorq) the cached result is an in-memory table. If it’s a remote database (Postgres, Snowflake, Trino), it’s a table in the remote database.

from xorq.caching import SourceCache

pg = xo.postgres.connect_env()
cache = SourceCache.from_kwargs(source=pg)

# Cached result is stored as a table in Postgres, named after the cache key
expr = pg.table("orders").filter(xo._.amount > 100).cache(cache)

SourceStorage checks for cache existence by looking at the backend’s table list: key in self.source.tables. On a miss, it creates a table with create_table or read_record_batches. On a hit, it returns a reference to the existing table.

ParquetStorage

Writes the cached result as a .parquet file on local disk. The default location is ~/.cache/xorq/parquet/, configurable via the XORQ_CACHE_DIR environment variable or ParquetStorage.relative_path and ParquetStorage.base_path constructor arguments.

from xorq.caching import ParquetCache

cache = ParquetCache.from_kwargs(source=con)
expr = pg.table("orders").filter(xo._.amount > 100).cache(cache)

ParquetStorage checks for cache existence by checking if the parquet file exists on disk. On a miss, it materializes the expression result into a parquet file (writing to a .tmp file first, then atomically renaming). On a hit, it creates a deferred read of the parquet file.

ParquetTTLStorage

Extends ParquetStorage with a time-to-live. The parquet file is treated as expired if its modification time is older than the configured TTL (default: 1 day). Combined with SnapshotStrategy, this gives you ParquetTTLSnapshotCache — a snapshot that automatically expires.

GCStorage

Stores parquet files in a Google Cloud Storage bucket instead of local disk. Same pattern as ParquetStorage, but reads and writes go through gcsfs.

Is the cache on the connection?

For SourceStorage yes, but for Parquet*Storage the connection is only used for writing on cache miss and for creating the deferred read on cache hit.

pg = xo.postgres.connect_env()
ddb = xo.duckdb.connect()
con = xo.connect()  # default DataFusion backend

cache_in_pg = SourceCache.from_kwargs(source=pg)
cache_in_ddb = SourceCache.from_kwargs(source=ddb)
cache_on_disk = ParquetCache.from_kwargs(source=con)

When you call .cache() with no arguments, xorq creates a SourceCache whose source is the expression’s own backend:

# These are equivalent
expr.cache()
expr.cache(SourceCache.from_kwargs(source=expr._find_backend()))

What actually happens on .execute()

Caching is lazy. Nothing is cached until you call .execute(). Here is the sequence:

  1. Xorq walks the expression DAG bottom-up, looking for CachedNode operations (the nodes created by .cache())
  2. For each CachedNode, it calls cache.set_default(expr, default):
    • Computes the cache key from the expression
    • Checks if the storage already has data for that key
    • Cache miss: materializes the expression result, writes to storage, and proceeds as if there was a cache hit
    • Cache hit: for SourceStorage, returns a reference to the table; for ParquetStorage, returns a deferred read of the storage location
  3. The CachedNode in the DAG is replaced with the storage result
  4. Execution continues on the transformed DAG

This is instrumented with OpenTelemetry — cache hits and misses emit trace events with the key, so you can see exactly what happened in your pipeline run.

Record batches and streaming

A record batch is Arrow’s unit of columnar data — a chunk of rows with a fixed schema. A RecordBatchReader is a streaming iterator that yields batches one at a time without loading the entire dataset into memory.

Record batches appear at every data-movement boundary in xorq:

  • Writing to ParquetStorage: value.to_expr().to_pyarrow_batches() streams batches from the source expression into a ParquetWriter. The full result never needs to live in memory at once.
  • Cross-engine transfer via into_backend(): data moves between engines as record batches over Arrow Flight or direct Arrow memory.
  • Ingestion into databases: read_record_batches() streams Arrow record batches into the target database, typically via ADBC (Arrow Database Connectivity).

The pattern is always the same: produce batches from one backend, consume them in another. This is how xorq moves data between engines without writing temp files or loading entire tables into memory.

Cache invalidation on database tables

The ModificationTimeStrategy includes backend-specific metadata in the cache key hash. When that metadata changes, the hash changes, and the cached result is never matched again — the expression re-executes.

There are caveats per backend:

  • Postgres: reltuples is an estimate updated by ANALYZE / autovacuum. If you insert a row and immediately re-execute before autovacuum runs, the cache may still hit. Running ANALYZE orders forces the statistics update.
  • Snowflake: uses LAST_ALTERED, which updates on any DDL or DML operation. More reliable than Postgres for detecting changes.
  • SQLite on disk: uses COUNT(*) and MAX(id). This requires a column named id and won’t detect updates that don’t change the count or max id.
  • DuckDB: for deferred reads, uses file inode information; for in-memory tables, hashes the actual data bytes. In-memory invalidation is precise but potentially expensive for large tables.

If you don’t want invalidation — you want a frozen snapshot regardless of what happens to the source — use a Snapshot variant (SourceSnapshotCache, ParquetSnapshotCache).

Backend support

Backend Cache key computation Invalidation signal ADBC ingestion Notes
Postgres Yes reltuples from pg_class Yes (PgADBC) Estimate; may lag behind writes until ANALYZE runs
Snowflake Yes LAST_ALTERED timestamp Yes (SnowflakeADBC) Reliable; updates on any DDL/DML
DuckDB Yes EXPLAIN plan / data bytes No (direct register) File reads use DDL; in-memory hashes data
SQLite Yes COUNT(*), MAX(id) (on-disk) / data bytes (in-memory) Yes (SQLiteADBC) On-disk invalidation requires id column
BigQuery Yes last_modified_time from __TABLES__
PyIceberg Yes Iceberg snapshot IDs No (uses create_table) Change detection tied to Iceberg’s snapshot model

Glossary

  • Cache key: a hex string derived from the expression and its upstream data. Same expression + same data = same key (input-addressed).
  • CachedNode: the internal DAG node created by .cache(). It wraps the uncached expression and a reference to the Cache object. During execution, it is replaced with the cached result.
  • deferred_read_parquet: a Read node that represents a lazy parquet read. It doesn’t load data until the expression is executed.
  • into_backend(): moves data from one engine to another via Arrow. Creates a RemoteTable node in the DAG.
  • RemoteTable: the DAG node representing data that will be transferred between backends.
  • Record batch: Arrow’s columnar data chunk. A RecordBatchReader streams these one at a time.
  • ADBC: Arrow Database Connectivity. The protocol Postgres, Snowflake, and SQLite use to ingest Arrow data.
  • Input-addressed: the cache key is derived from the input (expression + data state), not from a user-assigned name. Same input always maps to the same key.
  • Tokenization: the process of hashing an expression into a deterministic key. Xorq registers custom normalizers for every node type so that hashing is always deterministic.