Quickstart

Installation

Xorq can be installed using pip:

pip install xorq

Or using nix to drop into an IPython shell:

nix run github:xorq-labs/xorq

Quick Start

Write a Simple Pandas UDF

import xorq as xo
import xorq.expr.datatypes as dt

@xo.udf.make_pandas_udf(
    schema=xo.schema({"title": str, "url": str}),
    return_type=dt.bool,
    name="url_in_title",
)
def url_in_title(df):
    return df.apply(
        lambda s: (s.url or "") in (s.title or ""),
        axis=1,
    )

# Connect to xorq's embedded engine
con = xo.connect()

# Reference the parquet file
name = "hn-data-small.parquet"
expr = xo.deferred_read_parquet(
    con,
    xo.options.pins.get_path(name),
    name,
).mutate(**{"url_in_title": url_in_title.on_expr})

# Display results
print(expr.execute().head())
              by        id  parent  text        time   type  \
0         benkan  43181839     NaN  None  1740558278  story   
1  iancmceachern  43181846     NaN  None  1740558328  story   
2        michidk  43181847     NaN  None  1740558346  story   
3        journal  43181850     NaN  None  1740558374  story   
4         r0b3r4  43181862     NaN  None  1740558501  story   

                                                kids  descendants  score  \
0                                         [43181958]          1.0    2.0   
1                               [43182101, 43181865]          1.0    4.0   
2                               [43184793, 43182580]          4.0   27.0   
3                                               None          0.0    2.0   
4  [43181863, 43183390, 43182923, 43184536, 43184...         42.0  122.0   

                                               title  \
0  Guaranteed Ways to Annoy Your Product Manager ...   
1  Ex-Intel exec, blames the bureaucratic 'PowerP...   
2  A CLI to quickly launch VSCode/cursor devconta...   
3      Architecture of my accounting project [video]   
4  Telescope – an open-source web-based log viewe...   

                                                 url  url_in_title  
0  https://www.leadinginproduct.com/p/10-guarante...         False  
1  https://www.pcgamer.com/hardware/ex-intel-exec...         False  
2                   https://github.com/michidk/vscli         False  
3        https://www.youtube.com/watch?v=4-ZKIAUYboA         False  
4          https://github.com/iamtelescope/telescope         False  

Save this file as example.py.

Create a Sentiment Analysis Pipeline

You can also build more complex pipelines that process data and expose endpoints via Arrow Flight:

import pathlib
import pickle

import toolz
import pandas as pd
import xgboost as xgb

import xorq as xo
import xorq.expr.datatypes as dt
from xorq.common.utils.import_utils import import_python

# Define paths to models and data
TFIDF_MODEL_PATH = pathlib.Path(xo.options.pins.get_path("hn_tfidf_fitted_model"))
XGB_MODEL_PATH = pathlib.Path(xo.options.pins.get_path("hn_sentiment_reg"))
HACKERNEWS_DATA_NAME = "hn-fetcher-input-small"

# Import HackerNews library from pinned path
hackernews_lib = import_python(xo.options.pins.get_path("hackernews_lib"))

# Load pre-trained models
def load_models():
    transformer = pickle.loads(TFIDF_MODEL_PATH.read_bytes())
    xgb_model = xgb.XGBRegressor()
    xgb_model.load_model(XGB_MODEL_PATH)
    return transformer, xgb_model

def predict_sentiment(titles):
    transformer, xgb_model = load_models()
    return xgb_model.predict(transformer.transform(titles))

# Create a pandas UDF for sentiment prediction
@xo.udf.make_pandas_udf(
    schema=xo.schema({"title": str}),
    return_type=dt.float64,
    name="title_transformed",
)
def transform_predict(df):
    return predict_sentiment(df["title"])

# Define a pipeline
connection = xo.connect()
pipeline = (
    xo.deferred_read_parquet(
        connection,
        xo.options.pins.get_path(HACKERNEWS_DATA_NAME),
        HACKERNEWS_DATA_NAME,
    )
    .pipe(hackernews_lib.do_hackernews_fetcher_udxf)
    .select(xo._.title)
    .mutate(sentiment_score=transform_predict.on_expr)
)

# Execute the pipeline
results = pipeline.execute()
/home/runner/work/xorq/xorq/.venv/lib/python3.12/site-packages/sklearn/base.py:440: InconsistentVersionWarning: Trying to unpickle estimator TfidfTransformer from version 1.6.1 when using version 1.7.0. This might lead to breaking code or invalid results. Use at your own risk. For more info please refer to:
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
  warnings.warn(
/home/runner/work/xorq/xorq/.venv/lib/python3.12/site-packages/sklearn/base.py:440: InconsistentVersionWarning: Trying to unpickle estimator TfidfVectorizer from version 1.6.1 when using version 1.7.0. This might lead to breaking code or invalid results. Use at your own risk. For more info please refer to:
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
  warnings.warn(

Save this file as sentiment_pipeline.py.

Serve Your Pipeline

CLI Commands

Build (experimental)

Xorq makes it easy to serialize the pipeline in a diffable and human-readable format, including YAML for expressions, compiled SQL, and deferred reads. Once these artifacts are checked into git, we can build validation, lineage, and documentation tools in the CI/CD process.

 xorq build example.py -e expr
Building expr from scrap.py
              by  ...  url_in_title
0         benkan  ...         False
1  iancmceachern  ...         False
2        michidk  ...         False
3        journal  ...         False
4         r0b3r4  ...         False
[5 rows x 12 columns]
Written 'expr' to builds/831efa9ba0ec

You can also build more complex pipelines:

 xorq build sentiment_pipeline.py -e pipeline
Building pipeline from sentiment_pipeline.py
Written 'pipeline' to builds/36293178ec4f

The build artifacts are serialized to disk in the builds directory by default:

❯ ls -a builds/831efa9ba0ec/
8db502c29647.sql
32f00c2c2c8b.sql
deferred_reads.yaml
expr.yaml
metadata.json
profiles.yaml
sql.yaml

Run

Execute the serialized expressions by using xorq run:

 xorq run builds/831efa9ba0ec/

Serve (coming soon)

Deploy your pipeline as a service:

 xorq serve builds/36293178ec4f/

Advanced Features

Using Arrow Flight for Microservices

Create data microservices with Arrow Flight:

Expose your data pipeline as a service using Arrow Flight:

from xorq.flight import FlightServer, FlightUrl
from xorq.flight.exchanger import make_udxf
# Define schemas for the service
schema_in = xo.schema({"title": str})
schema_out = xo.schema({"sentiment_score": dt.double})

# Flight server function
def sentiment_analysis(df: pd.DataFrame):
    scores = predict_sentiment(df["title"])
    return pd.DataFrame({"sentiment_score": [float(scores)]})

# Create the UDXF for Flight server
sentiment_udxf = make_udxf(
    sentiment_analysis, 
    schema_in.to_pyarrow(), 
    schema_out.to_pyarrow()
)

# Start the Flight server with our exchanger
flight_port = 8815
flight_server = FlightServer(
    FlightUrl(port=flight_port), 
    exchangers=[sentiment_udxf]
)
flight_server.serve()

And now, we can connect and test our service:

# Test the service
client = flight_server.client
do_sentiment = toolz.curry(client.do_exchange, sentiment_udxf.command)

def test_flight_service():
    test_data = xo.memtable(
        {"title": ["This is an amazing HackerNews post"]}, 
        schema=schema_in
    )
    result = do_sentiment(test_data.to_pyarrow_batches())
    res = result[1].read_pandas()
    print("Flight service test result:\n", res)

print("Testing Flight service...")
test_flight_service()

Next Steps

  • Explore more examples in the examples/ directory
  • Read the full API documentation
  • Join our community for support and discussions