Installation

Xorq can be installed using pip:

pip install xorq

Or using nix to drop into an IPython shell:

nix run github:xorq-labs/xorq

Quick Start

Write a Simple Pandas UDF

import xorq as xo
import xorq.expr.datatypes as dt

@xo.udf.make_pandas_udf(
    schema=xo.schema({"title": str, "url": str}),
    return_type=dt.bool,
    name="url_in_title",
)
def url_in_title(df):
    return df.apply(
        lambda s: (s.url or "") in (s.title or ""),
        axis=1,
    )

# Connect to xorq's embedded engine
con = xo.connect()

# Reference the parquet file
name = "hn-data-small.parquet"
expr = xo.deferred_read_parquet(
    con,
    xo.options.pins.get_path(name),
    name,
).mutate(**{"url_in_title": url_in_title.on_expr})

# Display results
print(expr.execute().head())

Save this file as example.py.

Create a Sentiment Analysis Pipeline

You can also build more complex pipelines that process data and expose endpoints via Arrow Flight:

import pathlib
import pickle

import toolz
import pandas as pd
import xgboost as xgb

import xorq as xo
import xorq.expr.datatypes as dt
from xorq.common.utils.import_utils import import_python

# Define paths to models and data
TFIDF_MODEL_PATH = pathlib.Path(xo.options.pins.get_path("hn_tfidf_fitted_model"))
XGB_MODEL_PATH = pathlib.Path(xo.options.pins.get_path("hn_sentiment_reg"))
HACKERNEWS_DATA_NAME = "hn-fetcher-input-small"

# Import HackerNews library from pinned path
hackernews_lib = import_python(xo.options.pins.get_path("hackernews_lib"))

# Load pre-trained models
def load_models():
    transformer = pickle.loads(TFIDF_MODEL_PATH.read_bytes())
    xgb_model = xgb.XGBRegressor()
    xgb_model.load_model(XGB_MODEL_PATH)
    return transformer, xgb_model

def predict_sentiment(titles):
    transformer, xgb_model = load_models()
    return xgb_model.predict(transformer.transform(titles))

# Create a pandas UDF for sentiment prediction
@xo.udf.make_pandas_udf(
    schema=xo.schema({"title": str}),
    return_type=dt.float64,
    name="title_transformed",
)
def transform_predict(df):
    return predict_sentiment(df["title"])

# Define a pipeline
connection = xo.connect()
pipeline = (
    xo.deferred_read_parquet(
        connection,
        xo.options.pins.get_path(HACKERNEWS_DATA_NAME),
        HACKERNEWS_DATA_NAME,
    )
    .pipe(hackernews_lib.do_hackernews_fetcher_udxf)
    .select(xo._.title)
    .mutate(sentiment_score=transform_predict.on_expr)
)

# Execute the pipeline
results = pipeline.execute()

Save this file as sentiment_pipeline.py.

Serve Your Pipeline

CLI Commands

Build (experimental)

Xorq makes it easy to serialize the pipeline in a diffable and human-readable format, including YAML for expressions, compiled SQL, and deferred reads. Once these artifacts are checked into git, we can build validation, lineage, and documentation tools in the CI/CD process.

❯ xorq build example.py -e expr
Building expr from scrap.py
              by  ...  url_in_title
0         benkan  ...         False
1  iancmceachern  ...         False
2        michidk  ...         False
3        journal  ...         False
4         r0b3r4  ...         False
[5 rows x 12 columns]
Written 'expr' to builds/831efa9ba0ec

You can also build more complex pipelines:

❯ xorq build sentiment_pipeline.py -e pipeline
Building pipeline from sentiment_pipeline.py
Written 'pipeline' to builds/36293178ec4f

The build artifacts are serialized to disk in the builds directory by default:

❯ ls -a builds/831efa9ba0ec/
8db502c29647.sql
32f00c2c2c8b.sql
deferred_reads.yaml
expr.yaml
metadata.json
profiles.yaml
sql.yaml

Run

Execute the serialized expressions by using xorq run:

❯ xorq run builds/831efa9ba0ec/

Serve (coming soon)

Deploy your pipeline as a service:

❯ xorq serve builds/36293178ec4f/

Advanced Features

Using Arrow Flight for Microservices

Create data microservices with Arrow Flight:

Expose your data pipeline as a service using Arrow Flight:

from xorq.flight import FlightServer, FlightUrl
from xorq.flight.exchanger import make_udxf
# Define schemas for the service
schema_in = xo.schema({"title": str})
schema_out = xo.schema({"sentiment_score": dt.double})

# Flight server function
def sentiment_analysis(df: pd.DataFrame):
    scores = predict_sentiment(df["title"])
    return pd.DataFrame({"sentiment_score": [float(scores)]})

# Create the UDXF for Flight server
sentiment_udxf = make_udxf(
    sentiment_analysis, 
    schema_in.to_pyarrow(), 
    schema_out.to_pyarrow()
)

# Start the Flight server with our exchanger
flight_port = 8815
flight_server = FlightServer(
    FlightUrl(port=flight_port), 
    exchangers=[sentiment_udxf]
)
flight_server.serve()

And now, we can connect and test our service:

# Test the service
client = flight_server.client
do_sentiment = toolz.curry(client.do_exchange, sentiment_udxf.command)

def test_flight_service():
    test_data = xo.memtable(
        {"title": ["This is an amazing HackerNews post"]}, 
        schema=schema_in
    )
    result = do_sentiment(test_data.to_pyarrow_batches())
    res = result[1].read_pandas()
    print("Flight service test result:\n", res)

print("Testing Flight service...")
test_flight_service()

Next Steps

  • Explore more examples in the examples/ directory
  • Read the full API documentation
  • Join our community for support and discussions