import functools
import pandas as pd
import toolz
import xgboost as xgb
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import mean_absolute_error
import xorq as xo
import xorq.vendor.ibis.expr.datatypes as dt
from xorq.caching import (
ParquetStorage,
SourceStorage,
)from xorq.common.utils.defer_utils import deferred_read_parquet
from xorq.common.utils.import_utils import import_python
from xorq.expr.ml import (
deferred_fit_predict,
deferred_fit_transform_series_sklearn,
train_test_splits,
)from xorq.flight import (
FlightServer,
FlightUrl,
)
# Import the helper modules we used in previous parts
= import_python(xo.options.pins.get_path("hackernews_lib"))
m = import_python("/home/daniel/PycharmProjects/public-letsql/examples/libs/openai_lib.py") o
XGBoost Model Serving
Overview
In this tutorial (Part 4 of our series), you’ll learn how to: - Deploy your trained models as microservices using xorq’s Flight serving capabilities - Create Flight servers to serve your TF-IDF transformation and XGBoost prediction models - Use these services to make predictions on new data - Build an end-to-end inference pipeline
Previously:
- Part 1: Data Ingestion and Model-Assisted Labeling
- Part 2: Feature Engineering with TF-IDF
- Part 3: XGBoost Model Training
Prerequisites
- Completed Part 1 (Data Ingestion and Model-Assisted Labeling)
- Completed Part 2 (Feature Engineering with TF-IDF)
- Completed Part 3 (Training XGBoost Models)
- Python 3.8+ installed on your system
- Basic understanding of client-server architecture
Installation and Imports
First, ensure you have all required packages:
pip install xorq pandas scikit-learn xgboost
Then import the necessary modules:
Model Training Recap
First, let’s quickly recap our model training process from Part 3. Assume we’ve already:
- Defined our XGBoost model functions:
@toolz.curry
def fit_xgboost_model(feature_df, target_series, seed=0):
= xgb.XGBRegressor(
xgb_r ="multi:softmax",
objective=3,
num_class=mean_absolute_error,
eval_metric=6,
max_depth=10,
n_estimators=seed,
seed
)= pd.DataFrame(feature_df.squeeze().tolist())
X
xgb_r.fit(X, target_series)return xgb_r
@toolz.curry
def predict_xgboost_model(model, df):
return model.predict(df.squeeze().tolist())
- Set up our column names and deferred operations:
= "title"
transform_col = (transformed_col,) = (f"{transform_col}_transformed",)
features = "sentiment_int"
target = f"{target}_predicted"
target_predicted
= deferred_fit_transform_series_sklearn(
deferred_fit_transform_tfidf =transform_col,
col=TfidfVectorizer,
cls=dt.Array(dt.float64),
return_type
)
= deferred_fit_predict(
deferred_fit_predict_xgb =target,
target=list(features),
features=fit_xgboost_model,
fit=predict_xgboost_model,
predict=dt.float32,
return_type )
- Processed our data and split it into training and testing sets:
= "hn-fetcher-input-small"
name = xo.connect()
con = ParquetStorage(source=con)
storage # xo.postgres.connect_env().create_catalog("caching")
= xo.postgres.connect_env(database="caching")
pg
# Load and process the data
= (
raw_expr
deferred_read_parquet(
con,
xo.options.pins.get_path(name),
name,
)
.pipe(m.do_hackernews_fetcher_udxf)
)
= (
t
raw_exprfilter(xo._.text.notnull())
.=SourceStorage(pg))
.cache(storage=con)
.pipe(o.do_hackernews_sentiment_udxf, con=SourceStorage(pg))
.cache(storage=ParquetStorage(con))
.cache(storagefilter(~xo._.sentiment.contains("ERROR"))
.
.mutate(=xo._.sentiment.cases(
sentiment_int"POSITIVE": 2, "NEUTRAL": 1, "NEGATIVE": 0}.items()
{int)
).cast(
)
)
# Split into train and test sets
= t.pipe(
(train_expr, test_expr)
train_test_splits,="id",
unique_key=(0.6, 0.4),
test_sizes=42,
random_seed )
- Trained our TF-IDF and XGBoost models:
# Fit and transform with TF-IDF
= (
(deferred_tfidf_model, tfidf_udaf, deferred_tfidf_transform)
deferred_fit_transform_tfidf(
train_expr,=storage,
storage
)
)= train_expr.mutate(
train_tfidf_transformed **{transformed_col: deferred_tfidf_transform.on_expr}
)
# Fit and predict with XGBoost
= deferred_fit_predict_xgb(
(deferred_xgb_model, xgb_udaf, deferred_xgb_predict)
train_tfidf_transformed,=storage,
storage
)= (
train_xgb_predicted
train_tfidf_transformedconnect()).mutate(
.into_backend(xo.**{target_predicted: deferred_xgb_predict.on_expr}
)
)
# Define test pathway
= (
test_xgb_predicted **{transformed_col: deferred_tfidf_transform.on_expr})
test_expr.mutate(connect())
.into_backend(xo.**{target_predicted: deferred_xgb_predict.on_expr}
.mutate(
)
)
# Evaluate models
= train_xgb_predicted.execute()
x = test_xgb_predicted.execute()
y print(x.groupby("sentiment_int").sentiment_int_predicted.describe().T)
print(y.groupby("sentiment_int").sentiment_int_predicted.describe().T)
Understanding Flight in xorq
Before diving into the implementation, let’s understand what Flight is and how it works in xorq.
Apache Arrow Flight is a high-performance client-server framework for moving large datasets over the network. In xorq, Flight serves as the foundation for deploying models as microservices.
Key components of Flight in xorq: 1. FlightServer: Hosts your models and transformations as services 2. FlightUrl: Specifies the endpoint where your service is available 3. flight_serve: Function to create a Flight server from an xorq expression
Setting Up Flight Servers
Now that we have our trained models, let’s serve them using Flight.
Defining Port and Server Configuration
First, let’s define the ports where our services will be available:
# Define ports for our servers
= 8915
transform_port = 8916 predict_port
Creating Flight Servers for TF-IDF and XGBoost Models
Now, let’s set up Flight servers for our TF-IDF transformation and XGBoost prediction services:
# Create Flight servers for our models
= xo.expr.relations.flight_serve(
(transform_server, transform_do_exchange) connect()).mutate(
test_expr.into_backend(xo.**{transformed_col: deferred_tfidf_transform.on_expr}
),=functools.partial(FlightServer, FlightUrl(port=transform_port)),
make_server
)
= xo.expr.relations.flight_serve(
(predict_server, predict_do_exchange)
test_xgb_predicted,=functools.partial(FlightServer, FlightUrl(port=predict_port)),
make_server )
The flight_serve
function creates a Flight server from an xorq expression. It returns: 1. The server instance, which you can start and stop 2. A command that clients can use to interact with the server
The make_server
parameter allows you to customize the server configuration, such as specifying the port.
Extracting Server Commands
Each Flight server provides a unique command that clients use to invoke the service:
# Extract the commands for each server
= (
(transform_command, predict_command) 1] for do_exchange in (transform_do_exchange, predict_do_exchange)
do_exchange.args[ )
Creating New Data for Prediction
Let’s prepare some new data to make predictions on:
# Create a dataset of new HackerNews stories
= (
z "maxitem": 43346282, "n": 1000}])
xo.memtable([{
.pipe(m.do_hackernews_fetcher_udxf)filter(xo._.text.notnull())
.
.mutate(**{
"sentiment": xo.literal(None).cast(str),
"sentiment_int": xo.literal(None).cast(int),
}
) )
Making Predictions Using the Services
Now that we have our Flight servers set up and new data to predict on, we can use the model directly in xorq:
# Use the servers directly in xorq to make predictions
# Note: do_exchange here takes expr (not RecordBatchReader like in a client)
= predict_do_exchange(xo.register(transform_do_exchange(z), "t")).read_pandas() out
When using do_exchange
directly in the server script, it receives an xorq expression, which is different from client-side usage where it would receive a PyArrow RecordBatchReader. This is an important distinction to be aware of.
Best Practices for Flight Services
When deploying models using Flight, consider these best practices:
- Monitoring: Add logging to track service health and performance
- Error Handling: Implement robust error handling for client-server communication
- Scaling: Deploy multiple instances behind a load balancer for high-traffic scenarios
- Security: Configure authentication and authorization for production deployments
- Versioning: Implement versioning for your model endpoints to handle model updates
Summary and Next Steps
Congratulations! In this fourth part of our tutorial series, you’ve: 1. Deployed trained models as microservices using Flight 2. Set up servers for TF-IDF transformation and XGBoost prediction 3. Prepared new data for prediction 4. Used the Flight services to make predictions 5. Learned best practices for model serving
This completes our end-to-end tutorial series on sentiment analysis with xorq, covering: - Data ingestion and preprocessing (Part 1) - Feature engineering with TF-IDF (Part 2) - Model training with XGBoost (Part 3) - Model serving with Flight (Part 4)
Further Reading
Appendix
HackerNews Serving Appendix
import functools
import pandas as pd
import toolz
import xgboost as xgb
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import mean_absolute_error
import xorq as xo
import xorq.vendor.ibis.expr.datatypes as dt
from xorq.caching import (
ParquetStorage,
SourceStorage,
)from xorq.common.utils.defer_utils import deferred_read_parquet
from xorq.common.utils.import_utils import import_python
from xorq.expr.ml import (
deferred_fit_predict,
deferred_fit_transform_series_sklearn,
train_test_splits,
)from xorq.flight import (
FlightServer,
FlightUrl,
)
= import_python(xo.options.pins.get_path("hackernews_lib"))
m = import_python("/home/daniel/PycharmProjects/public-letsql/examples/libs/openai_lib.py")
o
@toolz.curry
def fit_xgboost_model(feature_df, target_series, seed=0):
= xgb.XGBRegressor(
xgb_r ="multi:softmax",
objective=3,
num_class=mean_absolute_error,
eval_metric=6,
max_depth# learning_rate=1,
=10,
n_estimators=seed,
seed
)= pd.DataFrame(feature_df.squeeze().tolist())
X
xgb_r.fit(X, target_series)return xgb_r
@toolz.curry
def predict_xgboost_model(model, df):
return model.predict(df.squeeze().tolist())
= "title"
transform_col = (transformed_col,) = (f"{transform_col}_transformed",)
features = "sentiment_int"
target = f"{target}_predicted"
target_predicted = deferred_fit_transform_series_sklearn(
deferred_fit_transform_tfidf =transform_col,
col=TfidfVectorizer,
cls=dt.Array(dt.float64),
return_type
)= deferred_fit_predict(
deferred_fit_predict_xgb =target,
target=list(features),
features=fit_xgboost_model,
fit=predict_xgboost_model,
predict=dt.float32,
return_type
)
= xo.expr.relations.flight_udxf(
do_hackernews_fetcher_udxf =m.get_hackernews_stories_batch,
process_df# process_df=get_hackernews_stories_batch,
=m.schema_in.to_pyarrow(),
maybe_schema_in=m.schema_out.to_pyarrow(),
maybe_schema_out="HackerNewsFetcher",
name
)
= "hn-fetcher-input-small"
name = xo.connect()
con = ParquetStorage(source=con)
storage "caching")
xo.postgres.connect_env().create_catalog(= xo.postgres.connect_env(database="caching")
pg = (
raw_expr
deferred_read_parquet(
con,
xo.options.pins.get_path(name),
name,
)# .pipe(do_hackernews_fetcher_udxf)
.pipe(m.do_hackernews_fetcher_udxf)
)= (
t
raw_expr# most stories have a tile, but few have text
# df.groupby("type").apply(lambda t: t.notnull().sum().div(len(t)))
filter(xo._.text.notnull())
.=SourceStorage(pg))
.cache(storage# .limit(100)
=con)
.pipe(o.do_hackernews_sentiment_udxf, con# commenting out this cache changes the hash of the subsequent hash
=SourceStorage(pg))
.cache(storage=ParquetStorage(con))
.cache(storagefilter(~xo._.sentiment.contains("ERROR"))
.
.mutate(=xo._.sentiment.cases(
sentiment_int"POSITIVE": 2, "NEUTRAL": 1, "NEGATIVE": 0}.items()
{int)
).cast(
)# .mutate(sentiment_int=xo._.sentiment.cases({"POSITIVE": 1, "NEUTRAL": 1, "NEGATIVE": 0}.items()).cast(int))
)= t.pipe(
(train_expr, test_expr)
train_test_splits,="id",
unique_key=(0.6, 0.4),
test_sizes=42,
random_seed
)
# fit-transform
= (
(deferred_tfidf_model, tfidf_udaf, deferred_tfidf_transform)
deferred_fit_transform_tfidf(
train_expr,=storage,
storage
)
)= train_expr.mutate(
train_tfidf_transformed **{transformed_col: deferred_tfidf_transform.on_expr}
)# fit-predict
= deferred_fit_predict_xgb(
(deferred_xgb_model, xgb_udaf, deferred_xgb_predict)
train_tfidf_transformed,=storage,
storage
)= (
train_xgb_predicted
train_tfidf_transformed# if i add into backend here, i don't get ArrowNotImplementedError: Unsupported cast
connect()).mutate(
.into_backend(xo.**{target_predicted: deferred_xgb_predict.on_expr}
)
)
# now we can define test pathway
= (
test_xgb_predicted **{transformed_col: deferred_tfidf_transform.on_expr})
test_expr.mutate(# if i add into backend here, i don't get ArrowNotImplementedError: Unsupported cast
# why is this stable-name required?
connect())
.into_backend(xo.**{target_predicted: deferred_xgb_predict.on_expr})
.mutate(
)
= train_xgb_predicted.execute()
x = test_xgb_predicted.execute()
y print(x.groupby("sentiment_int").sentiment_int_predicted.describe().T)
print(y.groupby("sentiment_int").sentiment_int_predicted.describe().T)
# fetch live and predict
= (
z "maxitem": 43346282, "n": 1000}])
xo.memtable([{
.pipe(m.do_hackernews_fetcher_udxf)filter(xo._.text.notnull())
.
.mutate(**{
"sentiment": xo.literal(None).cast(str),
"sentiment_int": xo.literal(None).cast(int),
}
)# .mutate(**{transformed_col: deferred_tfidf_transform.on_expr})
)
= 8915
transform_port = 8916
predict_port = xo.expr.relations.flight_serve(
(transform_server, transform_do_exchange) # why is this stable-name required?
connect()).mutate(
test_expr.into_backend(xo.**{transformed_col: deferred_tfidf_transform.on_expr}
),=functools.partial(FlightServer, FlightUrl(port=transform_port)),
make_server
)= xo.expr.relations.flight_serve(
(predict_server, predict_do_exchange)
test_xgb_predicted,=functools.partial(FlightServer, FlightUrl(port=predict_port)),
make_server
)= (
(transform_command, predict_command) 1] for do_exchange in (transform_do_exchange, predict_do_exchange)
do_exchange.args[
)# issue: do_exchange here takes expr, externally it takes RecordBatchReader
= predict_do_exchange(xo.register(transform_do_exchange(z), "t")).read_pandas()
out
= "execute-unbound-expr-f4961c805729fe38a39304b1317f4f20"
expected_transform_command = "execute-unbound-expr-dcfac288264dbae089b63d7a9b7d95d8"
expected_predict_commnd assert (transform_server.flight_url.port, transform_command) == (
transform_port,
expected_transform_command,
), (transform_command, expected_transform_command)assert (predict_server.flight_url.port, predict_command) == (
predict_port,
expected_predict_commnd, ), (predict_command, expected_predict_commnd)