import xorq as xoimport xorq.expr.datatypes as dt@xo.udf.make_pandas_udf( schema=xo.schema({"title": str, "url": str}), return_type=dt.bool, name="url_in_title",)def url_in_title(df):return df.apply(lambda s: (s.url or"") in (s.title or""), axis=1, )# Connect to xorq's embedded enginecon = xo.connect()# Reference the parquet filename ="hn-data-small.parquet"expr = xo.deferred_read_parquet( con, xo.options.pins.get_path(name), name,).mutate(**{"url_in_title": url_in_title.on_expr})# Display resultsprint(expr.execute().head())
by id parent text time type \
0 benkan 43181839 NaN None 1740558278 story
1 iancmceachern 43181846 NaN None 1740558328 story
2 michidk 43181847 NaN None 1740558346 story
3 journal 43181850 NaN None 1740558374 story
4 r0b3r4 43181862 NaN None 1740558501 story
kids descendants score \
0 [43181958] 1.0 2.0
1 [43182101, 43181865] 1.0 4.0
2 [43184793, 43182580] 4.0 27.0
3 None 0.0 2.0
4 [43181863, 43183390, 43182923, 43184536, 43184... 42.0 122.0
title \
0 Guaranteed Ways to Annoy Your Product Manager ...
1 Ex-Intel exec, blames the bureaucratic 'PowerP...
2 A CLI to quickly launch VSCode/cursor devconta...
3 Architecture of my accounting project [video]
4 Telescope – an open-source web-based log viewe...
url url_in_title
0 https://www.leadinginproduct.com/p/10-guarante... False
1 https://www.pcgamer.com/hardware/ex-intel-exec... False
2 https://github.com/michidk/vscli False
3 https://www.youtube.com/watch?v=4-ZKIAUYboA False
4 https://github.com/iamtelescope/telescope False
Save this file as example.py.
Create a Sentiment Analysis Pipeline
You can also build more complex pipelines that process data and expose endpoints via Arrow Flight:
import pathlibimport pickleimport toolzimport pandas as pdimport xgboost as xgbimport xorq as xoimport xorq.expr.datatypes as dtfrom xorq.common.utils.import_utils import import_python# Define paths to models and dataTFIDF_MODEL_PATH = pathlib.Path(xo.options.pins.get_path("hn_tfidf_fitted_model"))XGB_MODEL_PATH = pathlib.Path(xo.options.pins.get_path("hn_sentiment_reg"))HACKERNEWS_DATA_NAME ="hn-fetcher-input-small"# Import HackerNews library from pinned pathhackernews_lib = import_python(xo.options.pins.get_path("hackernews_lib"))# Load pre-trained modelsdef load_models(): transformer = pickle.loads(TFIDF_MODEL_PATH.read_bytes()) xgb_model = xgb.XGBRegressor() xgb_model.load_model(XGB_MODEL_PATH)return transformer, xgb_modeldef predict_sentiment(titles): transformer, xgb_model = load_models()return xgb_model.predict(transformer.transform(titles))# Create a pandas UDF for sentiment prediction@xo.udf.make_pandas_udf( schema=xo.schema({"title": str}), return_type=dt.float64, name="title_transformed",)def transform_predict(df):return predict_sentiment(df["title"])# Define a pipelineconnection = xo.connect()pipeline = ( xo.deferred_read_parquet( connection, xo.options.pins.get_path(HACKERNEWS_DATA_NAME), HACKERNEWS_DATA_NAME, ) .pipe(hackernews_lib.do_hackernews_fetcher_udxf) .select(xo._.title) .mutate(sentiment_score=transform_predict.on_expr))# Execute the pipelineresults = pipeline.execute()
/home/runner/work/xorq/xorq/.venv/lib/python3.12/site-packages/sklearn/base.py:440: InconsistentVersionWarning: Trying to unpickle estimator TfidfTransformer from version 1.6.1 when using version 1.7.0. This might lead to breaking code or invalid results. Use at your own risk. For more info please refer to:
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
warnings.warn(
/home/runner/work/xorq/xorq/.venv/lib/python3.12/site-packages/sklearn/base.py:440: InconsistentVersionWarning: Trying to unpickle estimator TfidfVectorizer from version 1.6.1 when using version 1.7.0. This might lead to breaking code or invalid results. Use at your own risk. For more info please refer to:
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
warnings.warn(
Save this file as sentiment_pipeline.py.
Serve Your Pipeline
CLI Commands
Build (experimental)
Xorq makes it easy to serialize the pipeline in a diffable and human-readable format, including YAML for expressions, compiled SQL, and deferred reads. Once these artifacts are checked into git, we can build validation, lineage, and documentation tools in the CI/CD process.
❯ xorq build sentiment_pipeline.py -e pipelineBuilding pipeline from sentiment_pipeline.pyWritten'pipeline' to builds/36293178ec4f
The build artifacts are serialized to disk in the builds directory by default:
❯ ls -a builds/831efa9ba0ec/
8db502c29647.sql
32f00c2c2c8b.sql
deferred_reads.yaml
expr.yaml
metadata.json
profiles.yaml
sql.yaml
Run
Execute the serialized expressions by using xorq run:
❯ xorq run builds/831efa9ba0ec/
Serve (coming soon)
Deploy your pipeline as a service:
❯ xorq serve builds/36293178ec4f/
Advanced Features
Using Arrow Flight for Microservices
Create data microservices with Arrow Flight:
Expose your data pipeline as a service using Arrow Flight:
from xorq.flight import FlightServer, FlightUrlfrom xorq.flight.exchanger import make_udxf# Define schemas for the serviceschema_in = xo.schema({"title": str})schema_out = xo.schema({"sentiment_score": dt.double})# Flight server functiondef sentiment_analysis(df: pd.DataFrame): scores = predict_sentiment(df["title"])return pd.DataFrame({"sentiment_score": [float(scores)]})# Create the UDXF for Flight serversentiment_udxf = make_udxf( sentiment_analysis, schema_in.to_pyarrow(), schema_out.to_pyarrow())# Start the Flight server with our exchangerflight_port =8815flight_server = FlightServer( FlightUrl(port=flight_port), exchangers=[sentiment_udxf])flight_server.serve()
And now, we can connect and test our service:
# Test the serviceclient = flight_server.clientdo_sentiment = toolz.curry(client.do_exchange, sentiment_udxf.command)def test_flight_service(): test_data = xo.memtable( {"title": ["This is an amazing HackerNews post"]}, schema=schema_in ) result = do_sentiment(test_data.to_pyarrow_batches()) res = result[1].read_pandas()print("Flight service test result:\n", res)print("Testing Flight service...")test_flight_service()