Chapter 4. Working with Data and Feature Stores
Machine learning takes data and turns it into predictive logic. Data is essential to the process, can come from many sources, and must be processed to make it usable. Therefore, data management and processing are the most critical components of machine learning. Data can originate from different sources:
- Files
-
Data stored in local or cloud files
- Data warehouses
-
Databases hosting historical data transactions
- Online databases
-
SQL, NoSQL, graph, or time series databases hosting up to date transactional or application data
- Data streams
-
Intermediate storage hosting real-time events and messages (for passing data reliably between services)
- Online services
-
Any cloud service that can provide valuable data (this can include social, financial, government, and news services)
- Incoming messages
-
Asynchronous messages and notifications, which can arrive through emails or any other messaging services (Slack, WhatsApp, Teams)
Source data is processed and stored as features for use in model training and model flows. In many cases, features are stored in two storage systems: one for batch access (training, batch prediction, and so on) and one for online retrieval (for real-time or online serving). As a result, there may be two separate data processing pipelines, one using batch processing and the other using real-time (stream) processing.
The data sources and processing logic will likely change over time, resulting in changes to the processed features and the model produced from that data. Therefore, applying versioning to the data, processing logic, and tracking data lineage are critical elements in any MLOps solution.
Delivery of accurate and high-quality production models requires high volumes of data and significant processing power. Processing of production data can be scaled using distributed analytics engines (Apache Spark, Dask, Google BigQuery, and more), stream processing technologies (like Apache Flink), or multistage data pipelines.
One of the mechanisms to automate integration with data sources, scalable batch and real-time data processing, data versioning, and feature management is to use a feature store. A feature store is a central hub for producing, sharing, and monitoring features. Feature stores are essential in modern MLOps implementations and will be described in further detail in this chapter.
Data Versioning and Lineage
Models and data products are derived from data. Therefore, collecting metadata and tracing the origin of the data allow better control and governance for data products. Furthermore, if you want to examine a specific version of a data product, you must understand the original data used to produce that product or model.
Data versioning, lineage, and metadata management are a set of essential MLOps practices that address the following:
- Data quality
-
Tracing data through an organization’s systems and collecting metadata and lineage information can help identify errors and inconsistencies. This makes it possible to take corrective action and improve data quality.
- Model reproducibility and traceability
-
Access to historical data versions allows us to reproduce model results and can be used for model debugging, troubleshooting, and trying out different parameter sets.
- Data governance and auditability
-
By understanding the origin and history of data, organizations can ensure that data follows expected policies and regulations, tracks sources of errors, and performs audits or investigations.
- Compliance
-
Data lineage can help organizations demonstrate compliance with regulations such as GDPR and HIPAA.
- Simpler data management
-
Metadata and lineage information enables better data discovery, mappings, profiling, integration, and migrations.
- Better collaboration
-
Data versioning and lineage can facilitate cooperation between data scientists and ML engineers by providing a clear and consistent view of the data used in ML models and when handling upgrades.
- Dependency tracking
-
Understanding how each data, parameter, or code change contributes to the results and providing insights into which data or model objects need to change due to data source modification.
How It Works
As shown in Figure 4-1, the data generation flow can be abstracted as having a set of data sources and parameters that are used as inputs to a data processing (computation) task that produces a collection of data products or artifacts. The output artifacts can be of different types, files, tables, machine learning models, charts, and so on.
The data tracking system records the information about the inputs (data sources and versions, parameters) and computation tasks (code, packages, resources, executing user, and more). Then, it adds it as metadata in the output artifacts. The metadata may include additional information like user-provided labels or tags, information about the data structure, schema, statistics, and so on. The metadata is usually not copied to each output artifact but is instead referenced (by a link) to eliminate data duplication.
As shown in Figure 4-2, output artifacts from the first task (for example, data preparation) can be used as data inputs to the following tasks (for example, model training, testing).
When accessing a data product through a user interface or an SDK, the metadata lets us see the exact data sources, parameters, and the full details of the computation task. We can also trace the progress of the data generated in a multistage flow and examine all the additional metadata.
Every time the data processing task runs, a new version of the output artifacts is created (see Figure 4-3). Each version is marked with a unique version identifier (commit id) and can also be tagged with a meaningful version name, such as master, development, staging, production, and so on. This is similar to the Git flow when versioning source code.
Let’s assume you are repeatedly running a specific task every hour. It has the same inputs and parameters or you might make small changes that do not change the output data results. This can lead to vast piles of redundant data, and multiple versions will store the same content. Many data versioning solutions implement content deduplication to address this challenge.
When an artifact is produced, a cryptographic hash value of the content is calculated (for example, using the MD5 or SHA1 algorithms), which represents the uniqueness of the content. Finally, the hash value is compared with older versions or is used as an index in the storage system. This way, the content is stored only once.
Since the nature of data versioning solutions is to track various attributes in addition to the source data (code, parameters, users, resources, and more), it must be well integrated with the source control system (Git) and the job or pipeline execution framework. Otherwise, the user must manually glue the frameworks together and provide the reference metadata for recording it along with the data.
Many frameworks (MLflow, MLRun, and more) provide a logging API, where the user calls a log_artifact()
method, which automatically records and versions the new data along with the code and execution metadata.
Many might offer an auto logging
solution that does not require code instrumentation. Instead, it will automatically figure out which data and metadata need to be saved and versioned by understanding the user code and the ML framework’s capabilities.
Common ML Data Versioning Tools
A set of open source and commercial frameworks for data versioning exists. This book focuses on explaining and comparing the open source options DVC, Pachyderm, MLflow, and MLRun.
Data Version Control
Data Version Control (DVC) started as a data versioning tool for ML and was extended to support basic ML workflow automation and experiment management. It takes advantage of the existing software engineering toolset you’re already familiar with (Git, your IDE, CI/CD, and so on).
DVC works just like Git (with similar commands) but for large file-based datasets and model artifacts. This is its main advantage but also its weakness. DVC stores the data content in files or an object storage (AWS S3, GCS, Azure Blob, and so on) and keeps a reference to those objects in a file (.dvc), which is stored in the Git repository.
The following command will add a local model file (model.pkl) to the data versioning system:
dvc add model.pkl
DVC will copy the content of the model.pkl file into a new file with a new name (derived from the content md5 hash value) and place it under the .dvc/ directory. It also creates a file named model.pkl.dvc
, which points to that content file. Next, the new metadata file needs to be tracked by Git, the content should be ignored, and the changes should be committed. This is done by typing the following commands:
git add model.pkl.dvc .gitignore git commit -m "Add raw data"
When you want to upload the data to your remote storage, you will need to set up a remote object repository (not shown here) and use the DVC push command:
dvc push
The data flow is illustrated in Figure 4-4.
As you can see from the example, DVC provides reliable synchronization between code and file data objects, but it requires manual configuration and does not store extended metadata about the execution, workflow, parameters, and so on. Instead, DVC handles parameters and results metrics using JSON or YAML files stored and versioned alongside the code.
Users can define workflow stages that wrap an executable (for example, a Python program) and specify which parameters (-p
) are the file inputs or dependencies (-d
) and outputs (-o
) to that executable (see Example 4-1).
Example 4-1. Adding a workflow step in DVC
dvc stage add -n featurize \ -p featurize.max_features,featurize.ngrams \ -d src/featurization.py -d data/prepared \ -o data/features \ python src/featurization.py data/prepared data/features
When you run the dvc repro
command, it will evaluate if the dependencies have changed, execute the required steps, and register the outputs.
DVC does not use an experiment database. It uses Git as the database, and every execution or parameter combination is mapped to a unique Git commit. Furthermore, DVC is focused on local development. Therefore, using it at scale or in a containerized or distributed workflow environment can be challenging and require scripting and manual integrations.
In summary, DVC is an excellent tool for versioning large data artifacts and mapping them to Git commits in a local development environment. In addition, DVC implements data deduplication to reduce the actual storage footprint. On the other hand, DVC is command-line oriented (Git flow) and has limited capabilities for running in production, executing pipelines, and tracking extended attributes and structured data. It also comes with a minimal UI (studio).
Pachyderm
Pachyderm is a data pipeline and versioning tool built on a containerized infrastructure. It provides a versioned file system and allows users to construct multistage pipelines, where each stage runs on a container, accepts input data (as files), and generates output data files.
Pachyderm provides a versioned data repository that can be implemented over object storage (for example, AWS S3, Minio, GCS) and accessed through a file API or the SDK/CLI. Every data commit or change is logged similarly to Git. Data is deduplicated to preserve space.
The Pachyderm data pipeline executes containers and mounts a slice of the repository into the container (under the /pfs/ directory). The container reads files, processes them, and writes the outputs back into the Pachyderm repository.
Example 4-2 shows a simple pipeline definition that takes all the data from the data
repository on the master
branch, runs the word count logic (using the specified container image), and writes the output to the out
repository.
Example 4-2. Pachyderm pipeline example
pipeline
:
name
:
'count'
description
:
'Count
the
number
of
lines
in
a
csv
file'
input
:
pfs
:
repo
:
'data'
branch
:
'master'
glob
:
'/'
transform
:
image
:
alpine:3.14.0
cmd
:
[
'/bin/sh'
]
stdin
:
[
'wc
-l
/pfs/data/iris.csv
>
/pfs/out/line_count.txt'
]
Pipelines can be triggered every time the input data changes, and data can be processed incrementally (only new files will be passed into the container process). This can save time and resources.
Pachyderm has a nice user interface for managing pipelines and exploring the data. See Figure 4-5.
Pachyderm can work with large or continuous structured data sources by breaking the data into smaller CSV or JSON files.
In summary, Pachyderm is an excellent tool for building versioned data pipelines, where the code is simple enough to read and write files. It handles deduplication and incremental processing. However, it requires separate tracking of the source code (runs prebuilt images), execution or experiment parameters, metadata, metrics, and more.
MLflow Tracking
MLflow is an open source platform for managing the end-to-end machine learning lifecycle. One of its core components is MLflow Tracking, which provides an API and UI for logging machine learning runs, their inputs and outputs, and visualizing the results. MLflow Tracking runs are executions of some data science code. Each run records the following information:
- Code version
-
Git commit hash used for the run.
- Start and end time
-
The start and end time of the run.
- Source
-
The name of the file to launch the run, or the project name and entry point for the run if running from an MLflow Project.
- Parameters
-
Key-value
input parameters of your choice. Both keys and values are strings. - Metrics
-
Key-value
metrics, where the value is numeric. MLflow records and lets you visualize the metric’s full history. - Artifacts
-
Output files in any format. For example, you can record images (for example, PNGs), models (for example, a pickled
scikit-learn
model), and data files (for example, a Parquet file) as artifacts.
MLflow Tracking is not a complete data versioning solution since it doesn’t support features such as data lineage (recording data inputs and which data was used to create a new data item) or deduplication. However, it enables logging and indexing the data outputs of every run along with the source code, parameters, and some execution details. MLflow can be manually integrated with other tools like DVC to track data and experiments.
MLflow’s advantage is tracking the data outputs with additional metadata about the code and parameters and visualizing or comparing them in a graphical UI. However, this is not free. The user code needs to be instrumented with the MLflow Tracking code.
Example 4-3 demonstrates a partial code snippet that tracks a run using the MLflow API. First, the command line arguments are parsed manually and the input data is passed as a string URL, just like any other parameter. Then, the loading and transformation of the data are done manually.
After the logic (data preparation, training, and so on) is executed, the user logs the tags, input parameters, output metrics, and data artifacts (dataset and model) using the MLflow log commands.
Example 4-3. MLflow Tracking code example
if
__name__
==
"__main__"
:
# parse the input parameters
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
"--data"
,
help
=
"input data path"
,
type
=
str
)
parser
.
add_argument
(
'--dropout'
,
type
=
float
,
default
=
0.0
,
help
=
'dropout ratio'
)
parser
.
add_argument
(
"--lr"
,
type
=
float
,
default
=
0.001
,
help
=
'learning rate'
)
args
=
parser
.
parse_args
()
# Read the csv file
try
:
data
=
pd
.
read_csv
(
args
.
data
)
except
Exception
as
e
:
raise
ValueError
(
f
"Unable to read the training CSV,
{
e
}
"
)
# additional initialization code ...
with
mlflow
.
start_run
():
# training code ...
# log experiment tags, parameters and result metrics
mlflow
.
set_tag
(
"framework"
,
"sklearn"
)
mlflow
.
log_param
(
"dropout"
,
args
.
dropout
)
mlflow
.
log_param
(
"lr"
,
args
.
lr
)
mlflow
.
log_metric
(
"rmse"
,
rmse
)
mlflow
.
log_metric
(
"r2"
,
r2
)
mlflow
.
log_metric
(
"mae"
,
mae
)
# log data and model artifacts
mlflow
.
log_artifacts
(
out_data_path
,
"output_data"
)
mlflow
.
sklearn
.
log_model
(
model
,
"model"
,
registered_model_name
=
"ElasticnetWineModel"
)
MLflow sends the run information to the tracking server and stores the data elements in local files or remote objects (for example, in S3). The run information can be viewed or compared in the MLflow user interface (see Figure 4-6).
MLflow does not manage or version data objects. Run is the primary element, and you cannot directly access or search data objects and artifacts. In addition, there is no lineage tracking, which means there is no tracking of which data objects were used to produce a new data object or artifact. When you run a pipeline, you cannot see the artifacts from the different steps in one place or chain output from one stage to the input of the next step.
With MLflow, the storage capacity can become significant since every run saves the outputs in a new file directory, even when nothing has changed. There is no data deduplication like in the other frameworks.
In summary, MLflow tracking is an excellent tool for tracking and comparing ML experiment results in a development environment. In addition, MLflow is easy to install and use. However, it is not a data tracking or versioning system and may require significant storage capacity. Furthermore, MLflow requires developers to add custom code and MLOps teams to add glue logic to fit into production deployments and CI/CD workflows.
MLRun
MLRun is an open source MLOps orchestration framework with multiple sub-components to handle the complete ML lifecycle. Data objects are first-class citizens in MLRun and are well integrated with the other components to provide seamless experience and automation.
Whereas most frameworks manage file data objects, MLRun supports a variety of data objects (data stores, items/files, datasets, streams, models, feature sets, feature vectors, charts, and more), each with unique metadata, actions, and viewers.
Every object in MLRun has a type, a unique version ID, tags (named versions like development, production, and so on), user-defined labels (for grouping and searching across objects), and relations to other objects, and it is a project member. For example, a run object has links to the source and output data objects and to function (code) objects, forming a graph of relations.
Figure 4-7 shows the run screen with information tabs for general and code attributes, data input objects, data/artifact output objects, result metrics, auto-collected logs, and so on. Users can view the information from different perspectives. For example, look at all the datasets in the project (regardless of which run generated them).
MLRun data objects and artifacts carry detailed metadata, including information on how they were produced (by whom, when, code, framework, and so on), which data sources were used to create them, and type-specific attributes such as schema, statistics, preview, and more. The metadata is auto-generated, which provides better observability and eliminates the need for additional glue logic.
Note
MLFlow users can continue using MLFlow for tracking APIs, and MLRun will automatically register the logged data, metadata, and models as production artifacts along with additional operational metadata and context.
MLRun provides an extensive API/SDK for tracking and searching across data and experiments. However, the real power is that it can deliver most of the features and automation without requiring additional coding.
Example 4-4 accepts input data and parameters and generates output data and results. Note that, unlike the previous examples, the code doesn’t include argument parsing, data loading, conversion, logging, and so on.
Example 4-4. MLRun code example
def
data_preparation
(
dataset
:
pd
.
DataFrame
,
test_size
=
0.2
):
# preform processing on the dataset
dataset
=
clean_df
(
dataset
)
.
dropna
(
how
=
"any"
,
axis
=
"rows"
)
dataset
=
dataset
.
drop
(
columns
=
[
"key"
,
"pickup_datetime"
])
train
,
test
=
train_test_split
(
dataset
,
test_size
=
test_size
)
return
train
,
test
,
"fare_amount"
When executing the function and specifying the input data object URL or path (a file, a remote object, or a complex type), it is automatically loaded into the function. For example, using AWS boto drivers to access S3 objects or BigQuery drivers to access a BigQuery table. Then the data is converted to the accepting format (DataFrame) and injected into the user code.
MLRun can auto-detect the returned value type (for example, train
and test
are of type DataFrame) and store it in the best form, along with auto-generated metadata, links to the job details and data input objects, and versioning information. If the data repeats itself, it is deduplicated and stored only once to preserve storage space.
Data objects have type-specific visualized in the UI and client SDK regardless of how and where they are stored; for example, tabular formats with table metadata (schema, stats, and more) for datasets or interactive graphics for chart objects (see Figures 4-8 and 4-9).
In summary, MLRun is a complete MLOps orchestration framework with a significant focus on data management, movement, versioning, and automation. In addition, MLRun has a rich object model that covers different types of data and execution objects (functions, runs, workflows, and more), how they are related, and how they are used. MLRun focuses on abstraction and automation to reduce development and deployment efforts. However, MLRun is not a general data management and versioning solution, and its value is maximized when used in the context of MLOps.
Other Frameworks
Some tools, such as Delta Lake and lakeFS, handle data lake versioning. However, those tools are not focused on the ML lifecycle and may require integration to make them useful for MLOps.
Cloud vendors provide solutions that are usually tightly bound to their internal services. For example, see Amazon SageMaker ML Lineage Tracking and Azure ML datasets.
Data Preparation and Analysis at Scale
Data processing is used extensively across the data, ML, and application pipelines. When working with production data, there is a need to support more extensive scale and performance, and, in some cases, handle data as it arrives in real time.
Practices that work during interactive development, for example, storing the data in a CSV file and reading it into the notebook, don’t work with gigabytes or terabytes of data. They require distributed or parallel data processing approaches.
The general architecture for distributed data processing is the same, with differences in how data is distributed and collected and which APIs they use. For example, the data is partitioned across multiple computer nodes, the processing requests or queries arrive at one or more nodes for local processing, and the results are collected and merged for a single answer. In addition, complex queries may shuffle data between nodes or execute multiple processing and movement steps.
Figure 4-10 demonstrates how distributed data processing works using the map-reduce approach for counting words in a document.
Structured and Unstructured Data Transformations
Data can be structured, meaning it conforms to a specific format or structure and often has a predefined schema or data model. Structured data can be a database table or files with a structured layout (for example, CSV, Excel, JSON, ML, Parquet). However, most of the world’s data is unstructured, usually more complex, and more difficult to process than structured data. This includes free text, logs, web pages, images, video, and audio.
Here are some examples of analytic transformations that can be performed on structured data:
- Filtering
-
Selecting a subset of the data based on certain criteria, such as a specific date range or specific values in a column.
- Sorting
-
Ordering the data based on one or more columns, such as sorting by date or by a specific value.
- Grouping
-
Organizing the data into groups based on one or more columns, such as grouping by product category or by city.
- Aggregation
-
Calculating summary statistics, such as count, sum, average, maximum, and standard deviation, for one or more columns.
- Joining
-
Combining data from multiple tables or datasets based on common columns, such as joining a table of sales data with a table of customer data.
- Mapping
-
Mapping values from one or more columns to new column values using user-defined operations or code. Stateful mapping can calculate new values based on original values and accumulated states from older entries (for example, time passed from the last login).
- Time series analysis
-
Analyzing or aggregating data over time, such as identifying trends, patterns, or anomalies.
The following techniques can be used to process unstructured data or convert it to structured data:
- Text mining
-
Using NLP techniques to extract meaning and insights from text data. Text mining can extract information such as sentiment, entities, and topics from text data.
- Computer vision
-
Using image and video processing techniques to extract information from visual data. Computer vision can extract information such as object recognition, facial recognition, and image classification.
- Audio and speech recognition
-
Using speech-to-text and audio processing techniques to extract meaning and insights from audio data. Audio and speech recognition can extract information such as speech-to-text, sentiment, and speaker identification.
- Data extraction
-
Using techniques such as web scraping and data extraction to pull out structured data from unstructured data sources.
Various ML methods can be used to transform raw data into more meaningful data, for example:
- Clustering
-
Grouping similar data points based on certain characteristics, such as customers with similar purchasing habits
- Dimensionality reduction
-
Reducing the number of features in a dataset to make it easier to analyze or visualize
- Regression and classification
-
Predicting a class or a value based on other data features
- Imputing
-
Determining the expected value based on other data points in case of missing data
- Embedding
-
Representing a sequence of text, audio, or an image as a numeric vector that preserves the semantic relationships or contextual characteristics.
Distributed Data Processing Architectures
Data processing architectures can be broken into three main categories:
- Interactive
-
A request or an update arrives, is processed, and a response is returned immediately; for example, SQL and NoSQL databases, data warehouses, key/value stores, graph databases, time series databases, and cloud services.
- Batch
-
A job is started on a request or a scheduled time, data is fetched and processed, and the results are written to the target storage after completion. Batch jobs usually take longer to process. Example frameworks for batch data processing include Hadoop, Spark, and Dask.
- Streaming
-
Continuous processing of incoming requests or chunks of data and writing the results in real time to a target storage or message queue.
Batch processing is usually more efficient for processing large data quantities. However, interactive and stream data processing deliver faster responses with shorter delays. In addition, building data stream processing pipelines is usually more complex than batch jobs.
Some frameworks like Spark may support different processing methods (interactive, batch, streaming), but they will usually be more optimal only in one of the processing methods.
Interactive Data Processing
Interactive systems are expected to respond immediately, so the requesting client or interactive dashboard will not need to wait. Furthermore, production services may depend on the reliability and robustness of the results. This is why interactive systems have simple APIs with limited data operations. In some cases, interactive systems provide mechanisms to define custom logic through stored procedures and user-defined functions (UDFs).
The main difference between the types of interactive data systems is how they index and store data to minimize response retrieval time. For example, NoSQL, in-memory, and key/value stores are optimized for retrieval by an index key (such as a user id, product id, and so on). The data is divided by the key (or a crypto hash or the key) and stored in different nodes. When a request arrives, it is passed to the specific node, which manages the data for that key (user, product, and so on) and can quickly calculate and retrieve the answer. On the other hand, complex or cross-key calculations require coordination between all the nodes and take much longer.
Analytical databases and data warehouses are designed to traverse many records with different index key values. They organize the data in columns (by field) and use various columnar compression technologies and filtering and hinting tricks (like bloom filtering) to skip data blocks.
Other systems like time series or graph databases have more advanced data layouts and search strategies that combine multidimensional indexes and columnar compression. For example, accessing the time series metric object by the metric key (name) and using columnar compression technologies to scan or aggregate the individual values (by time).
Many interactive systems use the SQL language or SQL-like semantics to process data.
Some subcategories of notable data systems are listed in Table 4-1.
Category | Description |
---|---|
Relational |
Store structured data, access through SQL command. Examples include MySQL, PostgreSQL, Oracle, and Microsoft SQL Server. |
NoSQL |
Examples include MongoDB, Cassandra, Redis, Elasticsearch, AWS DynamoDB, Google BigTable, and nontabular databases. |
Time series |
Store and query time series data. Examples include InfluxDB, Prometheus, and TimescaleDB. |
Graph |
Store and query data in a graph format. Examples include Neo4j and Titan. |
Vector |
A vector database indexes and stores high-dimensional vector embeddings for fast retrieval and similarity search. Examples include Chroma, Pinecone, Milvus , Weaviate, and Pgvector. |
Analytical systems usually traverse and process larger datasets. As a result, they support more extensive transformations (filtering, grouping, joining, aggregating, mapping, and so on) and user-defined functions. In addition, some can process and aggregate data from other databases or data stored in files. For example, solutions like Spark SQL or PrestoDB have connectors to many data sources and can process queries that span many datasets and are stored in different systems.
One of the most popular distributed SQL-based analytical engines is PrestoDB and its follow-on project, Trino. Presto was initially developed by Facebook and contributed to open source. Later, it was forked into projects like the Trino and commercial products such as Amazon Athena cloud service. Trino has a long list of data connectors.
Figure 4-11 illustrates Presto and Trino architectures. Queries arrive through HTTP requests, are parsed, and are broken by the planner and the scheduler into smaller tasks that are processed and merged by the individual workers.
Batch Data Processing
Batch data processing is used when there is a need to process large amounts of data and run a sequence of data transformations, and the processing time is less of a concern. In batch processing, the data is read and broken into chunks passed to multiple workers for processing. Once the result is ready, it is written to the target system. Batch processing is often used to process large amounts of historical data and generate the dataset for training ML models.
One of the best known batch data processing frameworks was Apache Hadoop, an open source software framework for distributed storage and large-scale processing of data-intensive tasks. Hadoop was initially developed by Yahoo! engineers and was based on the MapReduce programming model, which consists of two main functions: Map
and Reduce
. The Map
function takes an input dataset and processes it into a set of intermediate key-value pairs, which are then grouped by key and processed by the Reduce
function to produce the final output.
Hadoop has since been replaced with more modern and cloud-native architectures based on cloud object storage, containerized infrastructure, and computation frameworks such as Spark, Flink, Beam, Dask, and others.
An everyday use for batch processing is found in ETL tasks. ETL refers to extracting data from multiple sources, transforming it, and loading it into a target database, data warehouse, or data lake. ETL is a crucial step in the data integration process, as it allows organizations to extract, clean, and transform data from multiple sources into a single, centralized repository.
Batch-processing pipelines may be complex and have multiple steps and dependencies. Apache Airflow is one of the most popular open source frameworks for authoring, scheduling, and monitoring batch data pipelines.
Airflow was initially developed by Airbnb and is now maintained by the Apache Software Foundation. It provides a simple and easy-to-use interface for defining workflows as DAGs of tasks, where each task represents an individual processing step. The tasks can be written in Python and run in various environments, including locally, over Kubernetes, or in the cloud.
Airflow also provides a web-based user interface (see Figure 4-12) for managing and monitoring workflows, including the ability to see the status of each task, retry failed tasks, and manually trigger or schedule tasks. It also includes features for managing and organizing workflows, such as defining dependencies between tasks and setting up task retry logic.
Example 4-5 is an example of Python code that can be used to create a DAG in Apache Airflow that reads data from a CSV file, processes it, and writes it to a destination.
Example 4-5. Airflow data pipeline code example
import
csv
from
airflow
import
DAG
from
airflow.operators.python_operator
import
PythonOperator
from
datetime
import
datetime
,
timedelta
def
process_data
(
**
kwargs
):
ti
=
kwargs
[
'ti'
]
input_file
=
ti
.
xcom_pull
(
task_ids
=
'read_file'
)
processed_data
=
do_data_processing
(
input_file
)
return
processed_data
def
do_data_processing
(
input_file
):
# Placeholder function that performs data processing
processed_data
=
input_file
return
processed_data
def
read_csv_file
(
file_path
):
with
open
(
file_path
,
'r'
)
as
file
:
reader
=
csv
.
reader
(
file
)
return
list
(
reader
)
def
write_csv_file
(
file_path
,
data
):
with
open
(
file_path
,
'w'
)
as
file
:
writer
=
csv
.
writer
(
file
)
writer
.
writerows
(
data
)
default_args
=
{
'owner'
:
'airflow'
,
'depends_on_past'
:
False
,
'start_date'
:
datetime
(
2021
,
1
,
1
),
'email_on_failure'
:
False
,
'email_on_retry'
:
False
,
'retries'
:
1
,
'retry_delay'
:
timedelta
(
minutes
=
5
),
}
dag
=
DAG
(
'data_processing_dag'
,
default_args
=
default_args
,
description
=
'A DAG that reads data from a CSV file, processes it'
', and writes it to a destination'
,
schedule_interval
=
timedelta
(
hours
=
1
),
)
read_file
=
PythonOperator
(
task_id
=
'read_file'
,
python_callable
=
lambda
:
read_csv_file
(
'/path/to/input_file.csv'
),
xcom_push
=
True
,
dag
=
dag
,
)
process_data
=
PythonOperator
(
task_id
=
'process_data'
,
python_callable
=
process_data
,
provide_context
=
True
,
dag
=
dag
,
)
write_file
=
PythonOperator
(
task_id
=
'write_file'
,
python_callable
=
lambda
:
write_csv_file
(
'/path/to/output_file.csv'
,
ti
.
xcom_pull
(
task_ids
=
'process_data'
)),
provide_context
=
True
,
dag
=
dag
,
)
read_file
>>
process_data
>>
write_file
There are several cloud-based batch data pipeline services such as AWS Glue, Google Cloud Composer (based on Airflow), and Azure Data Factory.
One of the disadvantages of Hadoop or other batch pipelines is the need to read data from disk, process it, and write it again to disk at every step. However, frameworks such as Spark and Dask know how to compile the processing pipeline into an optimal graph where tasks are done in memory where possible, which minimizes the IO to disk and maximizes performance.
Example 4-6 demonstrates a Spark code that reads a CSV file, processes the data, and writes the result into a target file.
Example 4-6. PySpark data pipeline code example
from
pyspark.sql
import
SparkSession
# Create a Spark session
spark
=
SparkSession
.
builder
.
appName
(
"SimpleBatchProcessing"
)
.
getOrCreate
()
# Load a CSV file into a Spark DataFrame
df
=
spark
.
read
.
csv
(
"/path/to/input_file.csv"
,
header
=
True
,
inferSchema
=
True
)
# Perform some data processing on the DataFrame
processed_df
=
df
.
groupBy
(
"column_name"
)
.
agg
({
"column_name"
:
"mean"
})
# Write the processed DataFrame to a new CSV file
processed_df
.
write
.
csv
(
"/path/to/output_file.csv"
,
header
=
True
)
# Stop the Spark session
spark
.
stop
()
Example 4-7 shows the same task, implemented using Dask. The advantage of Dask is that the operations are very similar to Python pandas, which is a tremendous advantage for data scientists. However, Spark is usually more scalable and robust.
Example 4-7. Dask data pipeline code example
import
dask.dataframe
as
dd
# Load a CSV file into a Dask DataFrame
df
=
dd
.
read_csv
(
'/path/to/input_file.csv'
)
# Perform some data processing on the DataFrame
processed_df
=
df
.
groupby
(
'column_name'
)
.
column_name
.
mean
()
.
compute
()
# Write the processed DataFrame to a new CSV file
processed_df
.
to_csv
(
'/path/to/output_file.csv'
,
index
=
False
)
You can see that the Spark and Dask examples are much simpler compared to the Airflow ones. However, Airflow can be more suitable for managing and tracing long, complex jobs.
Stream Processing
Stream processing enables scalable, fault-tolerant, and real-time data processing. It is often used in applications that process large amounts of data in real time, such as real-time analytics, fraud detection, or recommendations.
In stream processing, data and incoming events are pushed into a stream (queue) and read by one or more workers. The workers process the data sequentially, make transformations, aggregate results, and write the results into a database or an output stream. Unlike traditional message queues, stream processing occurs in order. For example, assume the stream contains two events: one for customer login and another for customer logout. Not processing them in order can lead to a broken state. Another example is a money deposit operation, followed by a withdrawal. The withdrawal may be declined if operations are processed in the wrong order.
Streams are designed to scale. They are broken into partitions, and each partition handles a specific set of data objects, so it will not violate the order. For example, a user activity stream is partitioned by the user ID so that a specific user’s activities will always be stored in the same partition and processed by the same worker.
Streams such as Kafka, AWS Kinesis, and others are different than message queues like RabbitMQ, AMQP, Amazon SQS, Google Pub/Sub, and so on. Message queues do not guarantee message ordering. However, they guarantee reliable delivery of messages, while the client manages the reliability in the case of streams. Furthermore, they are much faster due to the more straightforward logic and parallelism offered with streams.
Figure 4-13 illustrates a streaming application in which clients publish data that is distributed between the individual partitions (based on a hash of the partition key). One worker is reading from each partition and processing the data. The worker can use a database to store the state on known intervals (checkpoints), so the state can be recovered in case of a failure, or the worker can free unused memory. Finally, the results can be written into a target database or an output stream.
Streams provide “at-least-once semantics.” Therefore, the same message may appear multiple times. A way to provide “exactly once” semantics (the same message is processed only once) is with the help of checkpoints. Streams are processed in order, and the state can be persisted after every micro-batch. In the case of a failure, the worker can restore the last checkpoint data (state), process the events from that point forward, and ignore older events.
Stream Processing Frameworks
Doing real-time analytics on real-time streams differs from doing it in batch or SQL. With streams, the workers can go over the data only once, in sequential order, and see a portion of the data (in the same partition). This is why real-time analytics frameworks such as Spark Streaming, Apache Flink, Apache Beam, Apache NiFi, and others, focus on stream processing and implement the standard analytic and statistic methods in a stream-optimized way.
A typical scenario in stream processing is to aggregate values over time; for example, examining the total value of customer transactions in the last hour to detect fraud. It is not feasible to calculate the total for every new event with stream processing. It will take a considerable amount of time and memory. Instead, the values are grouped into windowed buckets, for example, six buckets or more, each holding the total per 10 minutes. The process sums the values of only the last six buckets and drops the oldest bucket every 10 minutes. Figure 4-14 illustrates overlapping sliding windows with a one-minute window duration and 30-second window periods.
Example 4-8 shows the Apache Beam code for defining such a window.
Example 4-8. Defining the sliding window using Apache Beam
from
apache_beam
import
window
sliding_windowed_items
=
(
items
|
'window'
>>
beam
.
WindowInto
(
window
.
SlidingWindows
(
60
,
30
)))
Coding with stream processing frameworks requires advanced data engineering knowledge. This is why many users avoid real-time data, even though it can provide much better business value and more accurate model scoring results. Feature stores come to the rescue, as they can automatically generate the batch and the streaming pipeline from the same higher-level data processing logic.
Feature Stores
Feature stores are a factory and central repository for machine learning features. Feature stores handle the collection of raw data from various sources, the transformation pipeline, storage, cataloging, versioning, security, serving, and monitoring. They automate many processes described in this chapter, while accelerating production time and reducing engineering efforts. Feature stores form a shared catalog of production-ready features, enable collaboration and sharing between teams, and accelerate the innovation and delivery of new AI applications.
The first feature store implementations came from large service providers like Uber, Twitter, and Spotify. In those providers, AI is core to the business, and feature stores helped them accelerate the development and deployment of new AI applications and improve collaboration and reuse. Today there are multiple commercial and open source implementations to choose from.
Advanced feature stores provide the following capabilities:
- Data connectivity
-
Glueless integration with multiple offline (data lakes, data warehouses, databases, and so one) and online (streams, message queues, APIs, managed services, and so on) sources.
- Offline and online transformation
-
Some feature stores offer capabilities to automatically build and manage the batch and streaming pipelines from higher-level logic.
- Storage
-
Storing the generated features in an offline store (such as an object store) and an online store (usually a key/value database).
- Metadata management
-
Auto-generating, storing, and managing all feature metadata, including lineage, schemas, statistics, labels, and more.
- Versioning
-
Managing multiple versions of each feature and the process of promoting features from development to production and integrating with CI/CD.
- Generating and managing feature vectors
-
Correctly joining multiple features into a single dataset for use in training or serving applications.
- Central cataloging
-
Providing centralized access to generate, label, or search features.
- Security and governance
-
Controlling the access to features and raw data and to logging feature access.
- Easy-to-use UI and SDK
-
Simple access through APIs and a user interface to abstract the underline complexity, visualize features, and make it usable by data scientists.
- Monitoring and high availability
-
Monitoring the assets and data processing tasks automatically while reliably recovering from failures.
- Feature validation and analysis
-
Executing various data processing tasks automatically or as initiated by the user, to validate feature correctness or to generate a deep analysis of features, correlation, and so on.
You should thoroughly compare capabilities before choosing a feature store. For example, many have very partial functionality, may focus on cataloging features, or lack automated transformations, data management at scale, and real-time functionality. These capabilities provide the most significant value in accelerating time to production.
Feature Store Architecture and Usage
Figure 4-15 illustrates a feature store’s general architecture and usage. Raw data is ingested and transformed into features, and features are cataloged and served to different applications (training, serving, monitoring). APIs and a UI allow data scientists, data engineers, and ML engineers to update, search, monitor, and use features.
The core components of a feature store are:
- Transformation layer
-
Converts raw offline or online data into features and stores them in both an online (key/value) and offline (object) store.
- Storage layer
-
Stores multiple versions of a feature in feature tables (feature sets) and manages the data lifecycle (create, append, delete, monitor, and secure the data). The data layer stores each feature in two forms: offline for training and analysis and online for serving and monitoring.
- Feature retrieval
-
Accepts requests for multiple features (feature vectors) and other properties (such as time ranges and event data), and produces an offline data snapshot for training or a real-time vector for serving.
- Metadata management and cataloging
-
Stores the feature definition, metadata, labels, and relations.
Ingestion and Transformation Service
This chapter has discussed the complexities of implementing large-scale processing for batch and real-time data, data versioning, and metadata management. Feature stores aim to reduce that complexity through abstraction and automation. With modern feature stores, data pipelines are described using high-level transformation logic. This logic is converted to the underlying processing engine semantics and deployed as a continuous and production-grade service, saving significant engineering efforts.
Pipeline implementation is different for local development (using packages like pandas), large-scale offline data (using batch processing), and real-time data (using stream processing). The advantage of a feature store that supports automated transformations is that it uses one definition for all three deployment modes and eliminates the reengineering involved in porting data pipelines from one method to another. In some feature stores, the data pipeline technology will be determined by the data sources, whether offline (data lakes, data warehouses, databases, and so on) or online (streams, message queues, APIs, managed services, and others).
Feature stores implement the data ingestion and transformation on groups of features (called feature sets or feature groups) that originate from the same source; for example, all the features extracted from a credit card transaction log. Feature sets take data from offline or online sources, build a list of features through a set of transformations, and store the resulting features along with the associated metadata and statistics.
Figure 4-16 illustrates the transformation service (feature set). Once the data is ingested from the source, it passes through a graph (DAG) of transformations, and the resulting features are written into the offline and online stores.
Examples of transformation (by data type):
- Structured
-
Filter, group, join, aggregate, OneHot encoding, map, extract, and classify
- Textual
-
Extract, parse, disassemble, detect entities, sentiments, and embeddings
- Visual (images and videos)
-
Frame, resize, detect objects, crop, recolor, rotate, map, and classify
The generated transformation service should be production-grade and support auto-scaling, high availability, live upgrades, and more. In addition, it should support continuous data ingestion and processing. For example, new data may arrive continuously (for real time) or in scheduled intervals (for offline). Therefore, serverless function technologies are an excellent fit.
Feature Storage
The features are usually stored in two forms: offline storage for training and analytics applications and online storage for real-time serving and monitoring applications. See Figure 4-17.
The offline store holds all the historical data and often uses data lakes, object storage, or data warehouse technologies. For example, a common choice is to use compressed Parquet files stored in object storage like AWS S3.
The online store holds the most recent data and often uses NoSQL or key/value stores like Redis, AWS DynamoDB, Google BigTable, and others. The online store needs to support reading features in milliseconds.
Feature Retrieval (for Training and Serving)
Training, serving, and analysis applications require multiple features from multiple datasets and sources. In contrast, feature stores organize features in groups (called feature sets) based on their origin and entity (primary key such as a user id, product id, and so on).
Retrieving multiple features from different sources, times, and with different indexes can be a complex analytics task. Feature stores automatically determine the parameters required for the JOIN
query based on the features metadata, entity names, and user request data. In addition, when the datasets are transactional (records are marked with a timestamp), the join operation needs to take into account time correctness and time traveling to return only the values known at the time of the event (also referred to as as of join analytics operation).
Offline feature sets can be generated through SQL queries generated by the feature store. However, with real-time serving applications that need to respond in milliseconds, this creates considerable overhead, and other real-time methods are used. In addition, time-based features (such as the number of requests in the last hour) cannot be precalculated and require special handling to generate an accurate result (for example, by combining precalculated time windowed data and ad-hoc last-mile calculations).
Figure 4-18 illustrates the feature retrieval flow with two separate engines, one for offline retrieval and the other for real-time retrieval. Note that in the case of offline, the dataset is snapshotted or preserved in a new dataset to allow data lineage tracking and explainability.
The get_offline_features
request can accept event data to base the query on, a valid time range (for example, if we want to train the model based on data from the last month), and which features and columns should return (for example, whether to include the index, time, or label columns). Then, it initiates a local or serverless analytics job that computes the results and returns the features vector dataset.
In real-time retrieval, the system initializes the retrieval service (configuring a local or remote real-time analytics function once to save time on requests). Then, user requests are pushed with the entity keys (taken from the event data) and accept a result vector. In addition, some feature stores allow real-time imputing (replacing missing or NaN data with statistical feature values taken from the feature metadata).
Feature Stores Solutions and Usage Example
Feature stores started as internal platforms in leading cloud services providers (such as Uber, Spotify, and Twitter). But now, many open source and commercial feature store solutions are in the market. However, as in every important new technology space, there are many functionality differences between those solutions; you need to be aware so you can choose the right one.
The most notable and essential difference is if the feature store platform manages the data (transformation) pipeline for you and whether it supports both offline and real-time (streaming) pipelines. As you’ve read in this chapter, building and managing a scalable data pipeline is the major challenge. If you are forced to do it manually, it significantly undermines the value of a feature store.
Table 4-2 compares the leading feature store solutions:
Category | Feast | Tecton | MLRun | SageMaker | Vertex AI | Databricks | HopsWorks |
---|---|---|---|---|---|---|---|
Open source |
Yes |
No |
Yes |
No |
No |
No |
Yes |
Managed option |
No |
major clouds |
cloud + on-prem |
on AWS |
on GCP |
major clouds |
cloud + on-prem |
Offline pipelines |
No |
Yes |
Yes |
No |
No |
No |
Yes |
Real-time pipelines |
No |
Yes |
Yes |
No |
No |
No |
No |
Feature retrieval |
Yes |
Yes |
Yes |
Yes |
Yes |
Yes |
Yes |
Engines |
Spark |
Spark |
Python, Dask, Spark, Nuclio |
None |
Spark |
Spark |
Spark, Flink |
Feature analytics |
No |
Yes |
Yes |
No |
No |
No |
Yes |
Versioning and lineage |
No |
Yes |
Yes |
No |
No |
No |
Yes |
Features security |
No |
Yes |
Yes |
Yes |
No |
No |
No |
Monitoring |
No |
Yes |
Yes |
No |
No |
No |
Yes |
Glueless training and serving |
No |
No |
Yes |
No |
No |
No |
Yes |
The following sections will demonstrate how feature stores are used with the two leading open source frameworks: Feast and MLRun. Note that MLRun is more fully featured and provides offline and online transformation services (based on MLRun’s serverless engines) along with many other unique features.
Using Feast Feature Store
Feast does not provide a transformation service. Data should be prepared upfront and stored in a supported source (like S3, GCS, BigQuery). Feast registers the source dataset and its metadata (schema, entity, and so on) in a FeatureView object, as shown in Example 4-9.
Example 4-9. Defining Feast FeatureView (source: Feast)
# Read data from parquet files. Parquet is convenient for local development mode.
# For production, you can use your favorite DWH, such as BigQuery. See Feast
# documentation for more info.
driver_hourly_stats
=
FileSource
(
name
=
"driver_hourly_stats_source"
,
path
=
"/content/feature_repo/data/driver_stats.parquet"
,
timestamp_field
=
"event_timestamp"
,
created_timestamp_column
=
"created"
,
)
# Define an entity for the driver. You can think of entity as a primary key used to
# fetch features.
driver
=
Entity
(
name
=
"driver"
,
join_keys
=
[
"driver_id"
])
# Our parquet files contain sample data that includes a driver_id column, timestamps
# and three feature column. Here we define a Feature View that will allow us to serve
# this data to our model online.
driver_hourly_stats_view
=
FeatureView
(
name
=
"driver_hourly_stats"
,
entities
=
[
driver
],
ttl
=
timedelta
(
days
=
1
),
schema
=
[
Field
(
name
=
"conv_rate"
,
dtype
=
Float32
),
Field
(
name
=
"acc_rate"
,
dtype
=
Float32
),
Field
(
name
=
"avg_daily_trips"
,
dtype
=
Int64
),
],
online
=
True
,
source
=
driver_hourly_stats
,
tags
=
{},
)
Feast does not provide an online transformation or ingestion service. Instead, the user needs to run a materialization task to copy the offline features into the real-time store (database). Unfortunately, this also means that the data stored in the online store is inaccurate between materializations, and running materialization too frequently can result in significant computation overhead.
Running the materialization task via the SDK:
store
=
FeatureStore
(
repo_path
=
"."
)
store
.
materialize_incremental
(
datetime
.
now
())
The project may contain one or more feature views, and each is defined and materialized independently. Features can be retrieved from one or more feature views (will initiate a JOIN operation).
To retrieve offline features (directly from the offline source), use the get_historical_features()
API call as shown in Example 4-10.
Example 4-10. Retrieve offline features with Feast (source: Feast)
# The entity dataframe is the dataframe we want to enrich with feature values
# see https://docs.feast.dev/getting-started/concepts/feature-retrieval for details
# for all entities in the offline store instead
entity_df
=
pd
.
DataFrame
.
from_dict
(
{
# entity's join key -> entity values
"driver_id"
:
[
1001
,
1002
,
1003
],
# "event_timestamp" (reserved key) -> timestamps
"event_timestamp"
:
[
datetime
(
2021
,
4
,
12
,
10
,
59
,
42
),
datetime
(
2021
,
4
,
12
,
8
,
12
,
10
),
datetime
(
2021
,
4
,
12
,
16
,
40
,
26
),
],
# (optional) label name -> label values. Feast does not process these
"label_driver_reported_satisfaction"
:
[
1
,
5
,
3
],
# values we're using for an on-demand transformation
"val_to_add"
:
[
1
,
2
,
3
],
"val_to_add_2"
:
[
10
,
20
,
30
],
}
)
store
=
FeatureStore
(
repo_path
=
"."
)
# retrieve offline features, feature names are specified with <view>:<feature-name>
training_df
=
store
.
get_historical_features
(
entity_df
=
entity_df
,
features
=
[
"driver_hourly_stats:conv_rate"
,
"driver_hourly_stats:acc_rate"
,
"driver_hourly_stats:avg_daily_trips"
,
"transformed_conv_rate:conv_rate_plus_val1"
,
"transformed_conv_rate:conv_rate_plus_val2"
,
],
)
.
to_df
()
(
"----- Example features -----
\n
"
)
(
training_df
.
head
())
To retrieve online features from the online store, we use the get_online_features()
API call, as shown in Example 4-11.
Example 4-11. Retrieve online features with Feast (source: Feast)
from
pprint
import
pprint
from
feast
import
FeatureStore
store
=
FeatureStore
(
repo_path
=
"."
)
feature_vector
=
store
.
get_online_features
(
features
=
[
"driver_hourly_stats:acc_rate"
,
"driver_hourly_stats:avg_daily_trips"
,
"transformed_conv_rate:conv_rate_plus_val1"
,
"transformed_conv_rate:conv_rate_plus_val2"
,
],
entity_rows
=
[
# {join_key: entity_value}
{
"driver_id"
:
1001
,
"val_to_add"
:
1000
,
"val_to_add_2"
:
2000
,
},
{
"driver_id"
:
1002
,
"val_to_add"
:
1001
,
"val_to_add_2"
:
2002
,
},
],
)
.
to_dict
()
pprint
(
feature_vector
)
# results:
{
'acc_rate'
:
[
0.86463862657547
,
0.6959823369979858
],
'avg_daily_trips'
:
[
359
,
311
],
'conv_rate_plus_val1'
:
[
1000.6638441681862
,
1001.1511893719435
],
'conv_rate_plus_val2'
:
[
2000.6638441681862
,
2002.1511893719435
],
'driver_id'
:
[
1001
,
1002
]}
Using MLRun Feature Store
MLRun supports the registration of existing sources (like Feast) or the definition of a data pipeline for transforming source data into features. When defining the data pipeline (called a graph), MLRun provisions the selected data processing engine based on the abstract user definitions. MLRun supports a few processing engines, including local Python, Dask, Spark, and Nuclio (a real-time serverless engine).
In MLRun, by default, the pipeline writes into online and offline stores, so there is no need for separate materialization jobs, and the online and offline features are always in sync. In addition, MLRun can auto-detect the data schema, making it more straightforward and robust.
MLRun separates the definition of the feature set (a collection of features generated by the same pipeline) from the data source definitions. This way, you can use the same feature set in interactive development and in production. Just swap the source from a local file in development to a database or real-time Kafka stream in the production deployment.
Example 4-12 shows an example of defining a feature set for processing credit card transactions to detect credit card fraud. The definition includes the entity, timestamp, and transformation graph using built-in operators and aggregations. Note that a user can also add their custom Python operators. See the full example.
The data pipeline consists of the following:
-
Extracting the data components (hour, day of week).
-
Mapping the age values
-
One-hot encoding for the transaction category and the gender
-
Aggregating the amount (avg, sum, count, max over 2/12/24 hour time windows)
-
Aggregating the transactions per category (over 14 day time windows)
-
Writing the results to offline (Parquet) and online (NoSQL) targets
Example 4-12. Defining MLRun FeatureSet (source: MLRun)
import
mlrun.feature_store
as
fs
# Define the credit transactions FeatureSet
transaction_set
=
fs
.
FeatureSet
(
"transactions"
,
entities
=
[
fs
.
Entity
(
"source"
)],
timestamp_key
=
'timestamp'
,
description
=
"transactions feature set"
)
# Define and add value mapping
main_categories
=
[
"es_transportation"
,
"es_health"
,
"es_otherservices"
,
"es_food"
,
"es_hotelservices"
,
"es_barsandrestaurants"
,
"es_tech"
,
"es_sportsandtoys"
,
"es_wellnessandbeauty"
,
"es_hyper"
,
"es_fashion"
,
"es_home"
,
"es_contents"
,
"es_travel"
,
"es_leisure"
]
# One Hot Encode the newly defined mappings
one_hot_encoder_mapping
=
{
'category'
:
main_categories
,
'gender'
:
list
(
transactions_data
.
gender
.
unique
())}
# Define the data pipeline (graph) steps
transaction_set
.
graph
\.
to
(
DateExtractor
(
parts
=
[
'hour'
,
'day_of_week'
],
timestamp_col
=
'timestamp'
))
\.
to
(
MapValues
(
mapping
=
{
'age'
:
{
'U'
:
'0'
}},
with_original_features
=
True
))
\.
to
(
OneHotEncoder
(
mapping
=
one_hot_encoder_mapping
))
# Add aggregations for 2, 12, and 24 hour time windows
transaction_set
.
add_aggregation
(
name
=
'amount'
,
column
=
'amount'
,
operations
=
[
'avg'
,
'sum'
,
'count'
,
'max'
],
windows
=
[
'2h'
,
'12h'
,
'24h'
],
period
=
'1h'
)
# Add the category aggregations over a 14 day window
for
category
in
main_categories
:
transaction_set
.
add_aggregation
(
name
=
category
,
column
=
f
'category_
{
category
}
'
,
operations
=
[
'count'
],
windows
=
[
'14d'
],
period
=
'1d'
)
The data pipeline can be visualized using transaction_set.plot(rankdir="LR", with_targets=True)
, as seen in Figure 4-19.
Once you have the feature set definition, you can test and debug it with the
preview()
method that runs the data pipeline locally and lets you view the results:
df
=
fs
.
preview
(
transaction_set
,
transactions_data
)
df
.
head
()
When the feature set definition is done, you can deploy it as a production job that runs on demand, on a given schedule, or as a real-time pipeline.
For running batch ingestion, use the ingest()
method. For real-time ingestion from HTTP or streams, use deploy_ingestion_service_v2()
, which starts a real-time Nuclio serverless pipeline. See Example 4-13.
Example 4-13. Ingest data into MLRun FeatureSet (source: MLRun)
# Batch ingest the transactions dataset (from CSV file) through the defined pipeline
source
=
CSVSource
(
"mycsv"
,
path
=
"measurements.csv"
)
fs
.
ingest
(
transaction_set
,
source
=
source
)
# Deploy a real-time pipeline with HTTP API endpoint as the source
# MLRun support other real-time sources like Kafka, Kinesis, etc.
source
=
HTTPSource
()
fs
.
deploy_ingestion_service_v2
(
transaction_set
,
source
)
You can watch the feature sets, their metadata, and statistics in the MLRun feature store UI. See Figure 4-20.
The feature retrieval in MLRun is done using the feature vector object. Feature vectors hold the definitions of the requested features and additional parameters. In addition, they also store calculated values such as the features metadata, statistics, and so on, which can be helpful in training, serving, or monitoring tasks. For example, feature statistics are used for automated value imputing in the case of missing or NaN feature values and for model drift monitoring in the serving application.
Feature vectors can be created, updated, and viewed in MLRun’s UI.
Users first define the feature vector, then they can use it to obtain offline or online features. See how to retrieve offline features and use the get_offline_features()
method in Example 4-14.
Example 4-14. Get offline features from MLRun (source: MLRun)
# Define the list of features you will be using (<feature-set>.<feature>)
features
=
[
'transactions.amount_max_2h'
,
'transactions.amount_sum_2h'
,
'transactions.amount_count_2h'
,
'transactions.amount_avg_2h'
,
'transactions.amount_max_12h'
]
# Import MLRun's Feature Store
import
mlrun.feature_store
as
fstore
# Define the feature vector name for future reference
fv_name
=
'transactions-fraud'
# Define the feature vector using our Feature Store
transactions_fv
=
fstore
.
FeatureVector
(
fv_name
,
features
,
label_feature
=
"labels.label"
,
description
=
'Predicting a fraudulent transaction'
)
# Save the feature vector definition in the Feature Store
transactions_fv
.
save
()
# Get offline feature vector as dataframe and save the dataset to a parquet file
train_dataset
=
fstore
.
get_offline_features
(
transactions_fv
,
target
=
ParquetTarget
())
# Preview the dataset
train_dataset
.
to_dataframe
()
.
tail
(
5
)
To get real-time features, you first need to define a service (which initializes the real-time retrieval pipeline), followed by .get()
methods to request feature values in real time. The separation between the service creation (one-time initialization) and individual requests ensures lower request latencies. In addition, MLRun supports automatic value imputing based on the feature’s metadata and statistics. This can save significant development and computation overhead. See Example 4-15.
Example 4-15. Get online features from MLRun (source: MLRun)
# Create the online feature service, substitute NaN values with
# the feature mean value
svc
=
fstore
.
get_online_feature_service
(
'transactions-fraud:latest'
,
impute_policy
=
{
"*"
:
"$mean"
})
# Get sample feature vector
sample_fv
=
svc
.
get
([{
'source'
:
'C76780537'
}])
# sample_fv Result
[{
'amount_max_2h'
:
14.68
,
'amount_max_12h'
:
70.81
,
'amount_sum_2h'
:
14.68
,
'amount_count_2h'
:
1.0
,
'amount_avg_2h'
:
14.68
}]
Note
MLRun’s feature stores provide accurate real-time aggregations and low latency by combining precalculated values during the ingestion process with real-time calculations at feature request time.
The MLRun framework provides a model development and training pipeline, real-time serving pipelines, and integrated model monitoring. MLRun’s feature store is natively integrated with the other components, eliminating redundant glue logic, metadata translation, and so on, thus accelerating time to production.
Conclusion
With data management and processing being the most critical components of ML, it’s important to understand how to optimally perform data-related tasks. This chapter explores the recommended tools and practices for the various stages of working with your data. We started the chapter by discussing data versioning and lineage, which are essential for tracing data origin. Then we explored data preparation and analysis at scale, which is how the data is handled so it can be used in production. In this section, we also discussed the architecture of interactive data processing solutions and the differences between batch data processing and real-time processing.
After reviewing the challenges of implementing these practices at scale, we moved on to present the concept of feature stores, which are a central repository for ML features. We covered the capabilities of a feature store, such as data connectivity and offline and online transformation. We also showed where the feature store fits in the MLOps pipeline, from ingesting raw data to supporting the use of that data in training, serving, monitoring, and more. Finally, we reviewed different feature store solutions and how to use them.
Critical Thinking Discussion Questions
-
Which details does metadata provide? As data professionals, why do we need this information?
-
Which open source data versioning tools are available? Which one could be a good fit for your organization?
-
What’s the difference between batch processing and stream processing? When is each one used?
-
How does a feature store simplify data management and processing practices? Which capabilities enable this?
-
What are the differences between the Feast and the MLRun feature stores? Which one could be a good fit for your organization?
Exercises
-
Choose an open source solution (DVC, Pachyderm, MLflow, or MLRun) and create a data versioning script or workflow that will record and version data and metadata.
-
Create a prototype of a batch processing pipeline with the tool of your choice.
-
Connect a Trino data connector to a data source.
-
Train a demo model (you can use Hugging Face if you need a sample model) with a feature store.
-
Create a feature set and ingestion pipeline in MLRun. You can use this project as a reference.
Get Implementing MLOps in the Enterprise now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.