import pandas as pd
import toolz
import xgboost as xgb
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import mean_absolute_error, confusion_matrix
import xorq as xo
import xorq.vendor.ibis.expr.datatypes as dt
from xorq.caching import ParquetStorage
from xorq.common.utils.defer_utils import deferred_read_parquet
from xorq.common.utils.import_utils import import_python
from xorq.expr.ml import (
deferred_fit_predict,
deferred_fit_transform_series_sklearn,
train_test_splits,
)
# Import the helper modules we used in previous parts
= import_python(xo.options.pins.get_path("hackernews_lib"))
m = import_python(xo.options.pins.get_path("openai_lib")) o
XGBoost Training
Overview
In this tutorial (Part 3 of our series), you’ll learn how to: - Define deferred model training and prediction operations - Split data into train and test sets - Train an XGBoost model with TF-IDF - Make predictions on both training and test data - Evaluate model performance
Prerequisites
- Completed Part 1 (Data Ingestion and Model-Assisted Labeling)
- Completed Part 2 (Feature Engineering with TF-IDF)
- Python 3.8+ installed on your system
- Basic understanding of machine learning concepts
Installation and Imports
First, ensure you have the required packages:
pip install xorq pandas scikit-learn xgboost
Then import the necessary modules:
Setting Up Deferred Operations
Defining Model Training and Prediction Functions
Let’s define functions for training and making predictions with XGBoost:
@toolz.curry
def fit_xgboost_model(feature_df, target_series, seed=0):
= xgb.XGBRegressor(
xgb_r =mean_absolute_error,
eval_metric=6,
max_depth=seed,
seed
)= pd.DataFrame(feature_df.squeeze().tolist())
X
xgb_r.fit(X, target_series)return xgb_r
@toolz.curry
def predict_xgboost_model(model, df):
return model.predict(df.squeeze().tolist())
The fit_xgboost_model
function trains an XGBoost model on the provided features and target. The predict_xgboost_model
function applies the trained model to new data to generate predictions.
Note that we’re using multi:softmax
as the objective function since we have three sentiment classes (POSITIVE=2, NEUTRAL=1, NEGATIVE=0).
Now, let’s set up our deferred operations for both the TF-IDF transformation and XGBoost prediction:
# Define column names
= "title"
transform_col = (transformed_col,) = (f"{transform_col}_transformed",)
features = "sentiment_int"
target = f"{target}_predicted"
target_predicted
# Create a deferred TF-IDF transformer (same as in Part 2)
= deferred_fit_transform_series_sklearn(
deferred_fit_transform_tfidf =transform_col,
col=TfidfVectorizer,
cls=dt.Array(dt.float64),
return_type
)
# Create a deferred XGBoost model
= deferred_fit_predict(
deferred_fit_predict_xgb =target,
target=list(features),
features=fit_xgboost_model,
fit=predict_xgboost_model,
predict=dt.float32,
return_type )
The deferred_fit_predict
function creates a deferred operation that will: 1. Fit a model using the specified fit
function on the training data 2. Create a prediction operation that can be applied to any dataset
Unlike the TF-IDF transformation (which we covered in detail in Part 2), model training is implemented as an aggregate function rather than a UDXF function. This is because training involves aggregating across the entire dataset to learn patterns, while transformation is applied row by row.
Loading and Preparing the Data
Let’s load and prepare our data, similar to what we did in the previous parts:
# Initialize the backend
= xo.connect()
con = ParquetStorage(source=con)
storage
# Define the input dataset name
= "hn-fetcher-input-large"
name
# Load and process the data (similar to Parts 1 and 2)
= (
raw_expr
deferred_read_parquet(
con,
xo.options.pins.get_path(name),
name,
)
.pipe(m.do_hackernews_fetcher_udxf)
)
= (
t
raw_exprfilter(xo._.text.notnull())
.=con)
.pipe(o.do_hackernews_sentiment_udxf, con=ParquetStorage(con))
.cache(storagefilter(~xo._.sentiment.contains("ERROR"))
.
.mutate(=xo._.sentiment.cases(
sentiment_int"POSITIVE": 2, "NEUTRAL": 1, "NEGATIVE": 0}.items()
{int)
).cast(
) )
Splitting the Data into Train and Test Sets
Before training our model, we’ll split the data into training and testing sets:
# Split into train (60%) and test (40%) sets
= t.pipe(
(train_expr, test_expr)
train_test_splits,="id",
unique_key=(0.6, 0.4),
test_sizes=42,
random_seed )
The train_test_splits
function in xorq ensures a proper and deterministic split of your data. It works by using a hashing function to convert the unique key (id
in our case) into an integer, then applies a modulo operation to split the data into buckets.
Having a unique key field is essential as it allows xorq to deterministically order the table and assign records to either the training or test set. This approach ensures that: 1. The same record will always end up in the same split when using the same random seed 2. The splitting is distributed evenly across the dataset 3. Records are not duplicated across splits
Applying TF-IDF Transformation
Let’s apply the TF-IDF transformation to our training data:
# Create the deferred TF-IDF transformation
= (
(deferred_tfidf_model, tfidf_udaf, deferred_tfidf_transform)
deferred_fit_transform_tfidf(
train_expr,=storage,
storage
)
)
# Apply the transformation to the training data
= train_expr.mutate(
train_tfidf_transformed **{transformed_col: deferred_tfidf_transform.on_expr}
)
We’re using the same TF-IDF approach we explored in Part 2, fitting on the training data to create a vocabulary and then transforming the documents into numerical feature vectors. This step is necessary to convert the text into a format that our XGBoost model can process.
Training the XGBoost Model
Now, let’s train our XGBoost model on the transformed training data:
# Create the deferred XGBoost model and prediction operation
= deferred_fit_predict_xgb(
(deferred_xgb_model, xgb_udaf, deferred_xgb_predict)
train_tfidf_transformed,=storage,
storage
)
# Apply predictions to the training data
= (
train_xgb_predicted
train_tfidf_transformed**{target_predicted: deferred_xgb_predict.on_expr})
.mutate( )
Unlike the transformation step, model training is implemented as an aggregate function (xgb_udaf
). This is an important distinction:
- Transformation (UDF): Operates row by row, applying the same function to each record independently
- Training (UDAF): Aggregates across the entire dataset, learning patterns from all records collectively
The deferred_fit_predict_xgb
function returns three key components: - deferred_xgb_model
: an Expr that returns a trained model. - xgb_udaf
: The User-Defined Aggregate Function that performs the training - deferred_xgb_predict
: The scalar UDF that takes Expr as an input i.e. ExprScalarUDF
Making Predictions on Test Data
Similarly, we’ll apply both the TF-IDF transformation and XGBoost prediction to our test data:
# Apply TF-IDF transformation and XGBoost prediction to test data
= (
train_xgb_predicted
train_tfidf_transformedconnect()) # extra into backend (see warning below)
.into_backend(xo.**{target_predicted: deferred_xgb_predict.on_expr})
.mutate( )
Note the use of superfluous .into_backend(xo.connect())
. This is necessary to ensure proper handling of the data types during the prediction process and should be fixed. See the GitHub issue for more information.
Evaluating Model Performance
Let’s execute our pipeline and evaluate the model’s performance:
# Execute the training and test predictions
= train_xgb_predicted.execute()
train_results = test_xgb_predicted.execute()
test_results
# Print model performance statistics by sentiment class
print("Training Set Performance:")
print(train_results.groupby("sentiment_int").sentiment_int_predicted.describe().T)
print("\nTest Set Performance:")
print(test_results.groupby("sentiment_int").sentiment_int_predicted.describe().T)
# Calculate overall accuracy
= (train_results[target_predicted] == train_results[target]).mean()
train_accuracy = (test_results[target_predicted] == test_results[target]).mean()
test_accuracy
print(f"\nTraining Accuracy: {train_accuracy:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")
Summary and Next Steps
Congratulations! In this third part of our tutorial series, you’ve: 1. Created deferred operations for model training and prediction 2. Split data into training and testing sets 3. Applied TF-IDF transformation to convert text to features 4. Trained an XGBoost model for sentiment classification 5. Made predictions on both training and test data 6. Evaluated model performance using various metrics 7. Applied the model to make predictions on new data
In the next tutorial (Part 4), we’ll explore how to deploy our trained model for real-time predictions using xorq’s Flight serving capabilities.
Further Reading
Appendix
Complete Example Code
Code
import pandas as pd
import toolz
import xgboost as xgb
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import mean_absolute_error, confusion_matrix
import xorq as xo
import xorq.vendor.ibis.expr.datatypes as dt
from xorq.caching import ParquetStorage
from xorq.common.utils.defer_utils import deferred_read_parquet
from xorq.common.utils.import_utils import import_python
from xorq.expr.ml import (
deferred_fit_predict,
deferred_fit_transform_series_sklearn,
train_test_splits,
)
# Import the helper modules
= import_python(xo.options.pins.get_path("hackernews_lib"))
m = import_python(xo.options.pins.get_path("openai_lib"))
o
# Define model training and prediction functions
@toolz.curry
def fit_xgboost_model(feature_df, target_series, seed=0):
= xgb.XGBRegressor(
xgb_r ="multi:softmax",
objective=3,
num_class=mean_absolute_error,
eval_metric=6,
max_depth=10,
n_estimators=seed,
seed
)= pd.DataFrame(feature_df.squeeze().tolist())
X
xgb_r.fit(X, target_series)return xgb_r
@toolz.curry
def predict_xgboost_model(model, df):
return model.predict(df.squeeze().tolist())
# Define column names
= "title"
transform_col = (transformed_col,) = (f"{transform_col}_transformed",)
features = "sentiment_int"
target = f"{target}_predicted"
target_predicted
# Create deferred operations
= deferred_fit_transform_series_sklearn(
deferred_fit_transform_tfidf =transform_col,
col=TfidfVectorizer,
cls=dt.Array(dt.float64),
return_type
)
= deferred_fit_predict(
deferred_fit_predict_xgb =target,
target=list(features),
features=fit_xgboost_model,
fit=predict_xgboost_model,
predict=dt.float32,
return_type
)
# Initialize the backend
= xo.connect()
con = ParquetStorage(source=con)
storage
# Load and process data
= "hn-fetcher-input-large"
name = (
raw_expr
deferred_read_parquet(
con,
xo.options.pins.get_path(name),
name,
)
.pipe(m.do_hackernews_fetcher_udxf)
)
= (
t
raw_exprfilter(xo._.text.notnull())
.=con)
.pipe(o.do_hackernews_sentiment_udxf, con=ParquetStorage(con))
.cache(storagefilter(~xo._.sentiment.contains("ERROR"))
.
.mutate(=xo._.sentiment.cases(
sentiment_int"POSITIVE": 2, "NEUTRAL": 1, "NEGATIVE": 0}.items()
{int)
).cast(
)
)
# Split into train and test sets
= t.pipe(
(train_expr, test_expr)
train_test_splits,="id",
unique_key=(0.6, 0.4),
test_sizes=42,
random_seed
)
# Apply TF-IDF transformation
= (
(deferred_tfidf_model, tfidf_udaf, deferred_tfidf_transform)
deferred_fit_transform_tfidf(
train_expr,=storage,
storage
)
)
= train_expr.mutate(
train_tfidf_transformed **{transformed_col: deferred_tfidf_transform.on_expr}
)
# Train XGBoost model
= deferred_fit_predict_xgb(
(deferred_xgb_model, xgb_udaf, deferred_xgb_predict)
train_tfidf_transformed,=storage,
storage
)
= (
train_xgb_predicted
train_tfidf_transformedconnect())
.into_backend(xo.**{target_predicted: deferred_xgb_predict.on_expr})
.mutate(
)
# Make predictions on test data
= (
test_xgb_predicted
test_expr**{transformed_col: deferred_tfidf_transform.on_expr})
.mutate(connect(), name="stable-name")
.into_backend(xo.**{target_predicted: deferred_xgb_predict.on_expr})
.mutate(
)
# Execute and evaluate
= train_xgb_predicted.execute()
train_results = test_xgb_predicted.execute()
test_results
# Calculate overall accuracy
= (train_results[target_predicted] == train_results[target]).mean()
train_accuracy = (test_results[target_predicted] == test_results[target]).mean()
test_accuracy
print(f"Training Accuracy: {train_accuracy:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")