Route a step to a specific backend

Use into_backend() to run each pipeline step on the engine that suits it

This guide shows you how to move a pipeline step onto a specific backend with into_backend(). You have a working pipeline; one step would run better elsewhere, like a heavy filter near the data in PostgreSQL or a UDF that needs the embedded engine. One method call reroutes it.

Data moves between engines as Arrow record batches, streamed rather than materialized, so the transfer doesn’t require temp files or a full copy in memory.

Prerequisites

Steps

1. Decide where the step should run

The win comes from putting each step where it’s strongest. Typical reasons to move:

Move to When
PostgreSQL (or your warehouse) The data already lives there and the step reduces it: filter and aggregate before pulling anything out
DuckDB The step needs analytical strength: AsOf joins, window-heavy queries, scanning Parquet files
Embedded (DataFusion) The step uses Xorq features tied to the embedded engine: Python UDFs, ML steps, Flight serving
Nowhere (stay put) The current backend handles the step fine; every hop has a cost, so don’t pay it without a reason

A useful default: reduce data on the backend that stores it, then move the small result to the engine doing the specialized work.

2. Move the expression with into_backend()

Start with a filter in DuckDB, then hand the result to the embedded backend:

import xorq.api as xo

ddb = xo.duckdb.connect()
con = xo.connect()

penguins = xo.examples.penguins.fetch(backend=ddb)

filtered = penguins.filter(
    xo._.body_mass_g.notnull(),
    xo._.sex == "female",
)

moved = filtered.into_backend(con, "female_penguins")

into_backend() takes the target connection and an optional table name. The result is a new expression rooted in the target backend; the original is untouched.

3. Continue the pipeline on the new backend

Everything after the move runs on the target engine:

result = (
    moved.group_by("species")
    .agg(
        n=xo._.count(),
        avg_mass=xo._.body_mass_g.mean(),
    )
    .execute()
)
print(result)
     species   n     avg_mass
0     Adelie  73  3368.835616
1     Gentoo  58  4679.741379
2  Chinstrap  34  3527.205882

The filter executed in DuckDB; the aggregation executed on the embedded backend. The expression stays deferred until .execute(), so the transfer also waits until then.

4. Check which engines the pipeline touches

The .ls accessor shows the backends an expression spans:

expr = moved.group_by("species").agg(n=xo._.count())
print(expr.ls.backends)
print(expr.ls.is_multiengine)
(<xorq.backends.xorq_datafusion.Backend object at 0x7f7090991370>, <xorq.backends.duckdb.Backend object at 0x7f7090991a00>)
True

If is_multiengine is True when you expected a single engine (or the other way around), a step landed somewhere you didn’t intend.

Tip

Every .execute() re-reads the source and moves the data across again. If you run the pipeline more than once, add .cache() right after .into_backend(...) so the move only happens on the first run and later runs read the cached copy. See Cache results by backend.

See also