Processed our data and split it into training and testing sets:
name ="hn-fetcher-input-large"con = xo.connect()storage = ParquetStorage(source=con)pg = xo.postgres.connect_env(database="caching")# Load and process the dataraw_expr =( deferred_read_parquet( con, xo.options.pins.get_path(name), name,).pipe(m.do_hackernews_fetcher_udxf))t =( raw_expr.filter(xo._.text.notnull()).cache(storage=SourceStorage(pg)).pipe(o.do_hackernews_sentiment_udxf, con=con).cache(storage=SourceStorage(pg)).cache(storage=ParquetStorage(con)).filter(~xo._.sentiment.contains("ERROR")).mutate( sentiment_int=xo._.sentiment.cases({"POSITIVE":2,"NEUTRAL":1,"NEGATIVE":0}.items()).cast(int)))# Split into train and test sets(train_expr, test_expr)= t.pipe( train_test_splits, unique_key="id", test_sizes=(0.6,0.4), random_seed=42,)
Trained our TF-IDF and XGBoost models:
# Fit and transform with TF-IDF(deferred_tfidf_model, tfidf_udaf, deferred_tfidf_transform)=( deferred_fit_transform_tfidf( train_expr, storage=storage,))train_tfidf_transformed = train_expr.mutate(**{transformed_col: deferred_tfidf_transform.on_expr})# Fit and predict with XGBoost(deferred_xgb_model, xgb_udaf, deferred_xgb_predict)= deferred_fit_predict_xgb( train_tfidf_transformed, storage=storage,)train_xgb_predicted =( train_tfidf_transformed.into_backend(xo.connect()).mutate(**{target_predicted: deferred_xgb_predict.on_expr}))# Define test pathwaytest_xgb_predicted =( test_expr.mutate(**{transformed_col: deferred_tfidf_transform.on_expr}).into_backend(xo.connect()).mutate(**{target_predicted: deferred_xgb_predict.on_expr}))# Evaluate modelsx = train_xgb_predicted.execute()y = test_xgb_predicted.execute()print(x.groupby("sentiment_int").sentiment_int_predicted.describe().T)print(y.groupby("sentiment_int").sentiment_int_predicted.describe().T)
Before diving into the implementation, let’s understand what Flight is and how
it works in xorq.
Apache Arrow Flight is a high-performance client-server framework for moving
large datasets over the network. In xorq, Flight serves as the foundation for
deploying models as microservices.
Key components of Flight in xorq:
FlightServer: Hosts your models and transformations as services
FlightUrl: Specifies the endpoint where your service is available
flight_serve: Function to create a Flight server from an xorq expression
Each Flight server provides a unique command that clients use to invoke the service:
# Extract the commands for each server(transform_command, predict_command)=( do_exchange.args[1]for do_exchange in(transform_do_exchange, predict_do_exchange))
Let’s prepare some new data to make predictions on:
# Create a dataset of new HackerNews storiesz =( xo.memtable([{"maxitem":43346282,"n":1000}]).pipe(m.do_hackernews_fetcher_udxf).filter(xo._.text.notnull()).mutate(**{"sentiment": xo.literal(None).cast(str),"sentiment_int": xo.literal(None).cast(int),}))
Now that we have our Flight servers set up and new data to predict on, we can
use the model directly in xorq:
# Use the servers directly in xorq to make predictions# Note: do_exchange here takes expr (not RecordBatchReader like in a client)out = predict_do_exchange(xo.register(transform_do_exchange(z),"t")).read_pandas()
When using do_exchange directly in the server script, it receives an xorq
expression, which is different from client-side usage where it would receive a
PyArrow RecordBatchReader. This is an important distinction to be aware of.