Overview

In this tutorial (Part 4 of our series), you’ll learn how to:

  • Deploy your trained models as microservices using xorq’s Flight serving capabilities
  • Create Flight servers to serve your TF-IDF transformation and XGBoost prediction models
  • Use these services to make predictions on new data
  • Build an end-to-end inference pipeline

Previously,

Prerequisites

Installation and Imports

First, ensure you have all required packages:

pip install xorq pandas scikit-learn xgboost

Then import the necessary modules:

import functools

import pandas as pd
import toolz
import xgboost as xgb
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import mean_absolute_error

import xorq as xo
import xorq.vendor.ibis.expr.datatypes as dt
from xorq.caching import (
    ParquetStorage,
    SourceStorage,
)
from xorq.common.utils.defer_utils import deferred_read_parquet
from xorq.common.utils.import_utils import import_python
from xorq.expr.ml import (
    deferred_fit_predict,
    deferred_fit_transform_series_sklearn,
    train_test_splits,
)
from xorq.flight import (
    FlightServer,
    FlightUrl,
)

# Import the helper modules we used in previous parts
m = import_python(xo.options.pins.get_path("hackernews_lib"))
o = import_python(xo.options.pins.get_path("openai_lib"))

Model Training Recap

First, let’s quickly recap our model training process from Part 3. Assume we’ve already:

  1. Defined our XGBoost model functions:
@toolz.curry
def fit_xgboost_model(feature_df, target_series, seed=0):
    xgb_r = xgb.XGBRegressor(
        objective="multi:softmax",
        num_class=3,
        eval_metric=mean_absolute_error,
        max_depth=6,
        n_estimators=10,
        seed=seed,
    )
    X = pd.DataFrame(feature_df.squeeze().tolist())
    xgb_r.fit(X, target_series)
    return xgb_r

@toolz.curry
def predict_xgboost_model(model, df):
    return model.predict(df.squeeze().tolist())
  1. Set up our column names and deferred operations:
transform_col = "title"
features = (transformed_col,) = (f"{transform_col}_transformed",)
target = "sentiment_int"
target_predicted = f"{target}_predicted"

deferred_fit_transform_tfidf = deferred_fit_transform_series_sklearn(
    col=transform_col,
    cls=TfidfVectorizer,
    return_type=dt.Array(dt.float64),
)

deferred_fit_predict_xgb = deferred_fit_predict(
    target=target,
    features=list(features),
    fit=fit_xgboost_model,
    predict=predict_xgboost_model,
    return_type=dt.float32,
)
  1. Processed our data and split it into training and testing sets:
name = "hn-fetcher-input-large"
con = xo.connect()
storage = ParquetStorage(source=con)
pg = xo.postgres.connect_env(database="caching")

# Load and process the data
raw_expr = (
    deferred_read_parquet(
        con,
        xo.options.pins.get_path(name),
        name,
    )
    .pipe(m.do_hackernews_fetcher_udxf)
)

t = (
    raw_expr
    .filter(xo._.text.notnull())
    .cache(storage=SourceStorage(pg))
    .pipe(o.do_hackernews_sentiment_udxf, con=con)
    .cache(storage=SourceStorage(pg))
    .cache(storage=ParquetStorage(con))
    .filter(~xo._.sentiment.contains("ERROR"))
    .mutate(
        sentiment_int=xo._.sentiment.cases(
            {"POSITIVE": 2, "NEUTRAL": 1, "NEGATIVE": 0}.items()
        ).cast(int)
    )
)

# Split into train and test sets
(train_expr, test_expr) = t.pipe(
    train_test_splits,
    unique_key="id",
    test_sizes=(0.6, 0.4),
    random_seed=42,
)
  1. Trained our TF-IDF and XGBoost models:
# Fit and transform with TF-IDF
(deferred_tfidf_model, tfidf_udaf, deferred_tfidf_transform) = (
    deferred_fit_transform_tfidf(
        train_expr,
        storage=storage,
    )
)
train_tfidf_transformed = train_expr.mutate(
    **{transformed_col: deferred_tfidf_transform.on_expr}
)

# Fit and predict with XGBoost
(deferred_xgb_model, xgb_udaf, deferred_xgb_predict) = deferred_fit_predict_xgb(
    train_tfidf_transformed,
    storage=storage,
)
train_xgb_predicted = (
    train_tfidf_transformed
    .into_backend(xo.connect()).mutate(
        **{target_predicted: deferred_xgb_predict.on_expr}
    )
)

# Define test pathway
test_xgb_predicted = (
    test_expr.mutate(**{transformed_col: deferred_tfidf_transform.on_expr})
    .into_backend(xo.connect())
    .mutate(**{target_predicted: deferred_xgb_predict.on_expr}
    )
)

# Evaluate models
x = train_xgb_predicted.execute()
y = test_xgb_predicted.execute()
print(x.groupby("sentiment_int").sentiment_int_predicted.describe().T)
print(y.groupby("sentiment_int").sentiment_int_predicted.describe().T)

Understanding Flight in xorq

Before diving into the implementation, let’s understand what Flight is and how it works in xorq.

Apache Arrow Flight is a high-performance client-server framework for moving large datasets over the network. In xorq, Flight serves as the foundation for deploying models as microservices.

Key components of Flight in xorq:

  1. FlightServer: Hosts your models and transformations as services
  2. FlightUrl: Specifies the endpoint where your service is available
  3. flight_serve: Function to create a Flight server from an xorq expression

Setting Up Flight Servers

Now that we have our trained models, let’s serve them using Flight.

Defining Port and Server Configuration

First, let’s define the ports where our services will be available:

# Define ports for our servers
transform_port = 8915
predict_port = 8916

Creating Flight Servers for TF-IDF and XGBoost Models

Now, let’s set up Flight servers for our TF-IDF transformation and XGBoost prediction services:

# Create Flight servers for our models
(transform_server, transform_do_exchange) = xo.expr.relations.flight_serve(
    test_expr.into_backend(xo.connect()).mutate(
        **{transformed_col: deferred_tfidf_transform.on_expr}
    ),
    make_server=functools.partial(FlightServer, FlightUrl(port=transform_port)),
)

(predict_server, predict_do_exchange) = xo.expr.relations.flight_serve(
    test_xgb_predicted,
    make_server=functools.partial(FlightServer, FlightUrl(port=predict_port)),
)

The flight_serve function creates a Flight server from an xorq expression. It returns:

  1. The server instance, which you can start and stop
  2. A command that clients can use to interact with the server

The make_server parameter allows you to customize the server configuration, such as specifying the port.

Extracting Server Commands

Each Flight server provides a unique command that clients use to invoke the service:

# Extract the commands for each server
(transform_command, predict_command) = (
    do_exchange.args[1] for do_exchange in (transform_do_exchange, predict_do_exchange)
)

Creating New Data for Prediction

Let’s prepare some new data to make predictions on:

# Create a dataset of new HackerNews stories
z = (
    xo.memtable([{"maxitem": 43346282, "n": 1000}])
    .pipe(m.do_hackernews_fetcher_udxf)
    .filter(xo._.text.notnull())
    .mutate(
        **{
            "sentiment": xo.literal(None).cast(str),
            "sentiment_int": xo.literal(None).cast(int),
        }
    )
)

Making Predictions Using the Services

Now that we have our Flight servers set up and new data to predict on, we can use the model directly in xorq:

# Use the servers directly in xorq to make predictions
# Note: do_exchange here takes expr (not RecordBatchReader like in a client)
out = predict_do_exchange(xo.register(transform_do_exchange(z), "t")).read_pandas()

When using do_exchange directly in the server script, it receives an xorq expression, which is different from client-side usage where it would receive a PyArrow RecordBatchReader. This is an important distinction to be aware of.

Best Practices for Flight Services

When deploying models using Flight, consider these best practices:

  1. Monitoring: Add logging to track service health and performance
  2. Error Handling: Implement robust error handling for client-server communication
  3. Scaling: Deploy multiple instances behind a load balancer for high-traffic scenarios
  4. Security: Configure authentication and authorization for production deployments
  5. Versioning: Implement versioning for your model endpoints to handle model updates

Summary and Next Steps

Congratulations! In this fourth part of our tutorial series, you’ve:

  1. Deployed trained models as microservices using Flight
  2. Set up servers for TF-IDF transformation and XGBoost prediction
  3. Prepared new data for prediction
  4. Used the Flight services to make predictions
  5. Learned best practices for model serving

This completes our end-to-end tutorial series on sentiment analysis with xorq, covering:

  • Data ingestion and preprocessing (Part 1)
  • Feature engineering with TF-IDF (Part 2)
  • Model training with XGBoost (Part 3)
  • Model serving with Flight (Part 4)

Further Reading

Appendix