import xorq as xo
import xorq.expr.datatypes as dt
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import mean_absolute_error
from xorq.caching import ParquetStorage
from xorq.common.utils.defer_utils import deferred_read_parquet
from xorq.common.utils.import_utils import import_python
from xorq.expr.ml import (
deferred_fit_transform_series_sklearn,
train_test_splits,
)
# Import the helper modules we used in Part 1
= import_python(xo.options.pins.get_path("hackernews_lib"))
m = import_python(xo.options.pins.get_path("openai_lib")) o
Transform with TF-IDF
Overview
In this tutorial (Part 2 of our series), you’ll learn how to: - Load the labeled HackerNews data from Part 1 - Split the data into training and testing sets - Apply TF-IDF vectorization to the text data - Build deferred pipelines with fit-transform operations - Prepare the transformed data for model training
Prerequisites
- Completed Part 1 of this tutorial series (Data Ingestion and Model-Assisted Labeling)
- Python 3.8+ installed on your system
- Basic understanding of feature engineering concepts
Installation and Imports
First, make sure you have the required packages:
pip install xorq pandas scikit-learn
Then import the necessary modules:
Setting Up the TF-IDF Transformation
Now, let’s define our TF-IDF transformer using xorq’s deferred operations:
# Define which column we want to transform
= "title"
transform_col = f"{transform_col}_transformed"
transformed_col
# Create a deferred TF-IDF transformer
= deferred_fit_transform_series_sklearn(
deferred_fit_transform_tfidf =transform_col,
col=TfidfVectorizer,
cls=dt.Array(dt.float64),
return_type )
The deferred_fit_transform_series_sklearn
function creates a deferred operation that will be applied to our data pipeline. We’re using scikit-learn’s TfidfVectorizer to transform our text data into numerical features.
Loading the Labeled Data
Let’s initialize the backend and load our data from Part 1:
# Initialize the backend
= xo.connect()
con = ParquetStorage(source=con)
storage
# Define the input dataset name
= "hn-fetcher-input-large"
name
# Load the data
= (
raw_expr
deferred_read_parquet(
con,
xo.options.pins.get_path(name),
name,
)
.pipe(m.do_hackernews_fetcher_udxf)
)
# Process the data as we did in Part 1
= (
processed_expr
raw_exprfilter(xo._.text.notnull())
.=con)
.pipe(o.do_hackernews_sentiment_udxf, con=ParquetStorage(con))
.cache(storagefilter(~xo._.sentiment.contains("ERROR"))
.
.mutate(=xo._.sentiment.cases(
sentiment_int"POSITIVE": 2, "NEUTRAL": 1, "NEGATIVE": 0}.items()
{int)
).cast(
) )
Splitting Data into Train and Test Sets
Before applying our TF-IDF transformation, we’ll split the data into training and testing sets:
# Split into train (60%) and test (40%) sets
= processed_expr.pipe(
(train_expr, test_expr)
train_test_splits,="id",
unique_key=(0.6, 0.4),
test_sizes=42,
random_seed )
The train_test_splits
function in xorq ensures a proper split of your data. We’re using the ‘id’ field as a unique key to ensure that each record is assigned to either train or test set. The random seed ensures reproducibility.
Building the Deferred TF-IDF Pipeline
Now let’s build our deferred TF-IDF pipeline:
# Create the deferred TF-IDF model, transform operation
= (
(deferred_tfidf_model, tfidf_udaf, deferred_tfidf_transform)
deferred_fit_transform_tfidf(
train_expr,=storage,
storage
)
)
# Apply the transformation to the training data
= train_expr.mutate(
train_tfidf_transformed **{transformed_col: deferred_tfidf_transform.on_expr}
)
When we call deferred_fit_transform_tfidf(train_expr, storage=storage)
, we get back three important objects:
deferred_tfidf_model
: This is a Ibis expr that references the TF-IDF model that will be fit on the training data.tfidf_udaf
: This is a User-Defined Aggregate Function that encapsulates the TF-IDF transformation. User-Defined Aggregate Functions return 1 row and 1 column, reducing the input data.deferred_tfidf_transform
: This is Scalar User-Defined Function deferred function that uses the fitted embeddings that are derived from training data. Scalar UDFs provide a mechanism to perform row-wise transforms, outputting a single column.
The key benefit of this approach is that we define the transformation once, but can apply it consistently to multiple datasets (train, test, or new data). The model is fit only on the training data, avoiding any information leakage.
When we execute the pipeline, xorq efficiently manages the computation, ensuring that the model is fit only once, cached resources are reused for all transformations.
Applying the Transformation to Test Data
Similarly, we can apply the same transformation to the test data:
# Apply the transformation to the test data
= test_expr.mutate(
test_tfidf_transformed **{transformed_col: deferred_tfidf_transform.on_expr}
)
Notice that we’re using the same deferred_tfidf_transform
UDF that uses fitted transform on training data. This ensures that our test data is transformed in exactly the same way, without information leakage.
Executing and Examining the Transformed Data
Now let’s execute our pipeline and examine the transformed data:
# Execute the transformation on the training data
= train_tfidf_transformed.execute()
train_transformed
# Check the dimensions and structure of the transformed data
print(f"Number of training samples: {len(train_transformed)}")
print(f"Original title example: {train_transformed['title'].iloc[0]}")
print(f"Vector dimensions: {len(train_transformed[transformed_col].iloc[0])}")
# You can also examine specific feature values if needed
print(f"First 5 feature values: {train_transformed[transformed_col].iloc[0][:5]}")
Number of training samples: 381
Original title example: Show HN: Xenoeye – high performance network traffic analyzer (OSS, *flow-based)
Vector dimensions: 1489
First 5 feature values: [np.float64(0.0), np.float64(0.0), np.float64(0.0), np.float64(0.0), np.float64(0.0)]
Summary
Congratulations! In this second part of our tutorial series, you’ve: 1. Set up a deferred TF-IDF transformation pipeline 2. Split your data into training and testing sets 3. Applied the TF-IDF transformation to both sets 4. Examined the transformed data 5. Saved the transformed data for future use
Next Steps
In the next tutorial (Part 3), we’ll use the transformed data to train an XGBoost model for sentiment classification. We’ll build on the same deferred pipeline pattern to create an end-to-end machine learning workflow.
Further Reading
Appendix
Deferred
r0 := FlightUDXF: ibis_rbr-placeholder_ywruclahnbafvb
by string
id int64
parent float64
text string
time int64
type string
kids array<int64>
descendants float64
score float64
title string
url string
sentiment string
r1 := RemoteTable[r0, name=HackerNewsSentimentAnalyzer]
by string
id int64
parent float64
text string
time int64
type string
kids array<int64>
descendants float64
score float64
title string
url string
sentiment string
r2 := CachedNode[r1, strategy=modification_time, parquet=True, source=let-126580202190208]
by string
id int64
parent float64
text string
time int64
type string
kids array<int64>
descendants float64
score float64
title string
url string
sentiment string
r3 := Filter[r2]
Not(StringContains(haystack=r2.sentiment, needle='ERROR'))
r4 := Project[r3]
by: r3.by
id: r3.id
parent: r3.parent
text: r3.text
time: r3.time
type: r3.type
kids: r3.kids
descendants: r3.descendants
score: r3.score
title: r3.title
url: r3.url
sentiment: r3.sentiment
sentiment_int: Cast(SimpleCase(base=r3.sentiment, cases=['POSITIVE', 'NEUTRAL', 'NEGATIVE'],
results=[2, 1, 0], default=Cast(None, to=int8)), to=int64)
r5 := Filter[r4]
Cast(0, to=decimal(38, 9)) * 10000 <= Abs(Hash(StringConcat([StringJoin([Cast(r4.id, to=string)],
sep=','), '16157387885063800092468972531095442600227637936690303362357377535130907802013']))) % 10000
Abs(Hash(StringConcat([StringJoin([Cast(r4.id, to=string)], sep=','),
'16157387885063800092468972531095442600227637936690303362357377535130907802013']))) % 10000 < Cast(0.6,
to=decimal(38, 9)) * 10000
r6 := Aggregate[r5]
metrics:
_c9160aa7a9f22006e1547fa8f80d1b91(title): _c9160aa7a9f22006e1547fa8f80d1b91(r5.title)
CachedNode[r6, strategy=modification_time, parquet=True, source=let-126580202190208]
_c9160aa7a9f22006e1547fa8f80d1b91(title) binary
Complete Example Code
Code
import xorq as xo
import xorq.expr.datatypes as dt
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import mean_absolute_error
from xorq.caching import ParquetStorage
from xorq.common.utils.defer_utils import deferred_read_parquet
from xorq.common.utils.import_utils import import_python
from xorq.expr.ml import (
deferred_fit_transform_series_sklearn,
train_test_splits,
)
# Import the helper modules we used in Part 1
= import_python(xo.options.pins.get_path("hackernews_lib"))
m = import_python(xo.options.pins.get_path("openai_lib"))
o = "title"
transform_col = f"{transform_col}_transformed"
transformed_col
# Create a deferred TF-IDF transformer
= deferred_fit_transform_series_sklearn(
deferred_fit_transform_tfidf =transform_col,
col=TfidfVectorizer,
cls=dt.Array(dt.float64),
return_type
)= xo.connect()
con = ParquetStorage(source=con)
storage
# Define the input dataset name
= "hn-fetcher-input-large"
name
# Load the data
= (
raw_expr
deferred_read_parquet(
con,
xo.options.pins.get_path(name),
name,
)
.pipe(m.do_hackernews_fetcher_udxf)
)
# Process the data as we did in Part 1
= (
processed_expr
raw_exprfilter(xo._.text.notnull())
.=con)
.pipe(o.do_hackernews_sentiment_udxf, con=ParquetStorage(con))
.cache(storagefilter(~xo._.sentiment.contains("ERROR"))
.
.mutate(=xo._.sentiment.cases(
sentiment_int"POSITIVE": 2, "NEUTRAL": 1, "NEGATIVE": 0}.items()
{int)
).cast(
)
)= processed_expr.pipe(
(train_expr, test_expr)
train_test_splits,="id",
unique_key=(0.6, 0.4),
test_sizes=42,
random_seed
)= (
(deferred_tfidf_model, tfidf_udaf, deferred_tfidf_transform)
deferred_fit_transform_tfidf(
train_expr,=storage,
storage
)
)= train_expr.mutate(
train_tfidf_transformed **{transformed_col: deferred_tfidf_transform.on_expr}
)= test_expr.mutate(
test_tfidf_transformed **{transformed_col: deferred_tfidf_transform.on_expr}
)= train_tfidf_transformed.execute()
train_transformed print(f"Number of training samples: {len(train_transformed)}")
print(f"Original title example: {train_transformed['title'].iloc[0]}")
print(f"Vector dimensions: {len(train_transformed[transformed_col].iloc[0])}")
# You can also examine specific feature values if needed
print(f"First 5 feature values: {train_transformed[transformed_col].iloc[0][:5]}")