Chapter 1. Distributed Machine Learning Terminology and Concepts

Remember when data scientists ran their machine learning algorithms on datasets that fit in a laptop’s memory? Or generated their own data? It wasn’t because of a lack of data in the world; we had already entered the Zettabyte Era.1 For many, the data was there, but it was locked in the production systems that created, captured, copied, and processed data at a massive scale. Data scientists knew that gaining access to it would allow them to produce better, more profound machine learning models. But this wasn’t the only problem—what about computation? In many cases, data scientists didn’t have access to sufficient computation power or tools to support running machine learning algorithms on large datasets. Because of this, they had to sample their data and work with CSV or text files.

When the public cloud revolution occurred around 2016–2017, we could finally get hold of that desired computation capacity. All we needed was a credit card in one hand and a computer mouse in the other. One click of a button and boom, hundreds of machines were available to us! But we still lacked the proper open source tools to process huge amounts of data. There was a need for distributed compute and for automated tools with a healthy ecosystem.

The growth in digitalization, where businesses use digital technologies to change their business model and create new revenue streams and value-producing opportunities, increased data scientists’ frustration. Digitalization led to larger amounts of data being available, but data scientists couldn’t work with that data fast enough because they didn’t have the tools. The tedious process of waiting for days to try out one machine learning algorithm or to get a sample of production data blocked many from reaching their full potential. The need to improve and automate grew.

Small companies saw how larger ones had demonstrated a positive impact on their business by providing automated, personalized solutions to their customers, improving sentiment and boosting revenue. From a fantasy, machine learning became a hot commodity. Companies understood that to take advantage of it they would need more tools, and dedicated teams to build those tools in house, which in turn increased the demand for engineers to build reliable, scalable, accelerated, and high-performance tools to support machine learning workloads.

Netflix, the world-leading internet television network that streams hundreds of millions of hours of content daily, has stated that it uses machine learning pervasively, across all aspects of the business. This includes recommending personalized content for customers, optimizing the movie and show production processes in the Netflix studios, optimizing video and audio encoding, and improving advertising spending and ad creativity to reach new potential customers.

Machine learning has found applications in a broad array of industries, however, not just in technology-focused businesses. Data scientists and analytics teams at the multinational oil and gas company Shell plc leverage machine learning over large-scale datasets to support the business with insights regarding product opportunities and process optimizations and to test the effectiveness of different courses of action. One example is their inventory prediction model, which runs over 10,000 simulations across all parts and facilities, predicting demand and improving stocking. Shell also uses machine learning to power a recommendation engine for its customer loyalty program, Go+, which offers personalized offers and rewards to individual customers. This approach provides Shell with an enhanced engagement model that helps retain customers by catering to their specific needs.

Other industries use machine learning for fraud detection, recommendation systems, patient diagnosis, and more. Take a look at Figure 1-1 to get an idea of how you may be able to drive innovation using machine learning in your industry.

As these examples suggest, the ability to use large datasets to create solutions with proven business impact has been eye-opening for many companies looking to grow their business and improve revenue.

Figure 1-1. Some of the many uses of machine learning across industries

The computer science and engineering research communities have helped greatly in enabling scalable machine learning. In recent years, academic researchers have conducted hundreds if not thousands of studies on using machine learning, distributed computation, and databases and building smarter, more efficient algorithms to support distributed machine learning. As a result, general-purpose distributed platforms have emerged, such as the immensely popular Apache Spark. Apache Spark provides a scalable, general-purpose engine for analytics and machine learning workloads. At the same time, the teams behind various machine learning libraries built to support workloads on a single machine are tirelessly adding backend support for executing in a distributed setting. To list a few examples, additional capabilities to support distributed machine learning have been added to Google’s TensorFlow, which simplifies deep neural network workloads, and Facebook’s PyTorch, used for computer vision and natural language processing.

Throughout this book, we’ll focus on using Apache Spark, and I will show you how to bridge from it into distributed machine learning with TensorFlow and PyTorch. The book concludes with a discussion of machine learning deployment patterns in Chapter 10. To get you started, this chapter provides an introduction to the fundamental concepts, terminology, and building blocks of distributed machine learning. We will cover the basics of the following:

  • The machine learning workflow

  • Spark MLlib

  • Distributed computing

  • Distributed systems

Familiar with those concepts? We’ll shift to an introduction to Spark and PySpark in Chapter 2 and managing the machine learning lifecycle in Chapter 3.

Excited? Let’s dive in!

The Stages of the Machine Learning Workflow

Many applications today are driven by machine learning, using machine learning models to answer questions such as: How can my application automatically adapt itself to the customer’s needs? How can I automate this tedious process to enable my employees to do more with their time? How can I make sense of my pile of data without spending the whole year going over it? However, as data practitioners, we have just one question to answer: How can we enable the process to answer those questions?

The short answer is machine learning. A more comprehensive response is the machine learning workflow.

The machine learning workflow comprises a set of stages that help us reach the goal of having a machine learning model running in production solving a business problem. What is a machine learning model? Good question! A machine learning model is the output of a machine learning algorithm. From now on, we will refer to it simply as a model. The automation of this workflow is referred to as the machine learning pipeline. To improve the accuracy of the model, the workflow is iterative. This allows us to exercise complete control over the model—including automation, monitoring, and deployment—and its output.

The machine learning workflows consist of multiple stages, some of which can be skipped and some of which may be repeated:

  1. Collect and load/ingest data. The first stage is to collect the data required for the process and load it into the environment where you will execute your machine learning experiment.

  2. Explore and validate the data. Next, explore the data you have collected and evaluate its quality. This stage often involves statistical testing of how well the training data represents real-world events, its distribution, and the variety in the dataset. This is also referred to as exploratory data analysis (EDA).

  3. Clean/preprocess the data. After stage 2, you might reach the conclusion that the data is noisy. A noisy dataset is one with columns that do not contribute to the training at all—for example, rows with null values, or long string values. They require more processing power but don’t improve model accuracy. In this stage, data scientists will run statistical tests on the data to validate the correlation between features and analyze which features provide value as is, which require more preprocessing or engineering, and which are redundant.

  4. Extract features/perform feature engineering. The previous stage outputs the data columns as features. These are the descriptors of the data, used as the inputs to the machine learning model. Features in machine learning are often external to the original data, meaning we need to enrich the existing data with data from other sources. That requires us to develop the code to compute and produce these features and enrich the dataset with them before training the model. There are many ways to do this, and it often requires domain expertise. Alternatively, the features may already be present in another dataset, in which case all we need to do is merge the two datasets into one before training the model.

  5. Split the data into a training set and a validation set. The training set is used for training the machine learning model, and the validation set is for evaluating the performance of the model on unseen data.

  6. Train and tune the model. Feed the training data to the machine learning algorithm, and adjust the parameters to improve performance. Validate the outcome using the dedicated validation dataset. The validation process takes place in the development environment, either locally on your machine or in a development/experimentation environment in the cloud. The outcome of this stage is the model.

  7. Evaluate the model with test data. This is the last testing stage before the model is pushed to production. In this stage, you again measure the model’s performance on previously unseen data, this time testing it in a production-like setting. After this stage, you might want to go back and revisit stage 6.

  8. Deploy the model. During this stage, data scientists together with machine learning and production engineers package the model and deploy it to production with all its requirements.

  9. Monitor the model. In production, the model must constantly be monitored for drift (different types of drift are discussed in Chapter 10). It is crucial to continually evaluate the model’s value to the business and know when to replace it.

Each of these stages is repeatable on its own, and it may be that given a specific result you will want to complete the whole process again. For example, in the case of model drift, the data and the model are not representative of the business problem, and you will need to start the process over from the beginning.

Each stage is unique and highly dependent on the data, the system requirements, your knowledge, the algorithms in use, the existing infrastructure, and the desired outcome.

Stages 3 to 6 are often considered the experimental phase of machine learning. You will want to iterate repeatedly and produce multiple versions of the data and the model until you find the best version of the model.

To learn more about machine learning workflows and automating them using pipelines with TensorFlow and TensorBoard, read Building Machine Learning Pipelines by Hannes Hapke and Catherine Nelson (O’Reilly).

Tools and Technologies in the Machine Learning Pipeline

Figure 1-2 shows an overview of the machine learning pipeline and some of the tools that may be used in each stage.

Figure 1-2. A high-level view of the machine learning pipeline and tools used in each stage

We’ll use various tools and platforms in the tutorials in this book to complete the different stages (you can, of course, decide to replace any of these tools with a different tool of your choice). For experimenting with data ingestion, preprocessing, and feature engineering, we will use Jupyter, which provides a UI and a backend server. We’ll write the code in a notebook in the UI, and the backend will package it and send it over to the Spark engine.

In the model-building stage (corresponding to stages 6 and 7 in the machine learning workflow described earlier), we train, validate, and tune the model. We will use multiple servers and backends in this stage, including Jupyter, PyTorch, TensorFlow, Horovod, Petastorm, and MLflow, to orchestrate operations, cache the data, and transition the workflow from a Spark cluster to a deep learning cluster.

Finally, to deploy and serve our models, we will use Spark and MLflow. We’ll load the model from the MLflow storage server and serve it with Spark or as a REST API with a Python function.

Note

In most organizations, developing an end-to-end machine learning pipeline requires a dedicated team whose members have various sets of skills, as well as an integrated development environment (IDE) like PyCharm that provides rich developer tools and code autocompletion, dedicated scripts for continuous integration/continuous deployment (CI/CD), and much more. For the educational purposes of this book, we’ll stick to Jupyter notebooks.

Distributed Computing Models

Distributed computing is the use of distributed systems, where multiple machines work together as a single unit, to solve a computational problem. A program that runs inside such a system is called a distributed program, and the process of writing such a program is known as distributed programming. That’s what we’ll be doing in this book. Our goal is to find the best way to divide a problem into separate tasks that multiple machines can solve in parallel through message communication. There are different distributed computing models for machine learning, and we can categorize these into two groups: general-purpose models that we can tweak to support distributed machine learning applications, and dedicated computing models specifically designed for running machine learning algorithms.

General-Purpose Models

General-purpose distributed computing models allow users to write a custom data processing flow using a defined abstraction. Apache Spark is a general-purpose distributed computing engine that, at its heart, implements the MapReduce programming model and has more recently been extended to support the barrier model. You’ll learn about both of those in this section, as well as some other distributed computing models (MPI [Message Passing Interface] and shared memory) that are available with TensorFlow and PyTorch.

MapReduce

The MapReduce programming model was inspired by the functional programming paradigm. Google introduced the MapReduce algorithm in 2004, in a research paper discussing how its search engine processes large-scale data. As developers or data science practitioners, we specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs and a reduce function that merges all the intermediate values associated with the same intermediate key. This approach is an extension of the split-apply-combine strategy for data analysis. In practice, every task is split into multiple map and reduce functions. Data is partitioned and distributed over various nodes/machines, and each chunk of data is processed on a dedicated node. Many solutions aim to preserve data locality as much as possible, where the partitioned data is local to the node processing it. A logic function is applied to the data on that node, then a shuffle operation of moving the data over the network is performed to combine the data from the different nodes, and a reduce operation is performed on the combined output from the mappers.

If necessary, another split-apply-combine round can then be performed on the output of the reducer. Examples of open source solutions that implement these concepts in one way or another include Apache Spark, Hadoop MapReduce, and Apache Flink. We’ll talk about the MapReduce model in more detail throughout the book.

MPI

Another interesting general-purpose distributed computing model is the Message Passing Interface (MPI) programming model. This is the most flexible model available today, and it was designed for high-performance, scalable, and portable distributed computing. MPI is a message passing interface that models a parallel program running on a distributed-memory system. It standardizes the communication between a set of processors by defining the data types that can be sent between the processors. Each processor has a unique identifier, and each communicator is a set of processors ordered in a specific topology. MPI is a low-level standard that can be implemented in hardware, compilers, wrappers, etc. Different versions of it have been implemented commercially by HP, Intel, Microsoft, and others.

MPI provides functions like MPI_Bcast, which broadcasts a message to all processors in the communicator, sharing the data; MPI_Alltoall, which sends all data to all nodes; and MPI_Reduce and MPI_Allreduce, which are similar to MapReduce and Apache Spark’s reduce functions. You can think of the interface as a set of building blocks for distributed frameworks that provides functionality for distributed machine learning.

The downside of MPI lies in its low-level permissiveness. It can be pretty labor-intensive and error-prone to execute and implement complex operations with MPI; it requires explicitly managing data type distribution, sending and receiving functionality, and fault tolerance and generally demands the developer to think about distributed arrays, data frames, hash tables, trees, etc. MPI is often used for deep learning workloads. Horovod’s core functions are based on MPI concepts, such as size, rank, local_rank, allreduce, allgather, and broadcast. For distributed computing, TensorFlow provides support for MPI as part of its communication protocol.

Barrier

A barrier is a synchronization method that is commonly used in parallel computing and is implemented in distributed computing frameworks like Apache Spark. A task or job is split into dependent stages or subtasks that need to be completed before processing can continue in the next stage. A barrier makes a group of machines stop at a certain point and wait for the rest of the machines to finish their computation before they can move on together to the next stage of the computation logic. Barrier models can be implemented in hardware and software, and stages can take many shapes, from directed acyclic graphs (DAGs) to trees or sequential operations.

Although it is a general-purpose distributed computing model, the barrier model enables diverse distributed machine learning algorithms. For example, in deep learning, each layer in the stacked artificial neural network is a stage, and each stage’s computation depends on the output of the previous stage. Barrier models enable the management of many layers of training in this case.

Shared memory

Shared memory models have a long history: they originated in operating systems like POSIX and Windows, where processes running on the same machine needed to communicate over a shared address space. Distributed shared memory models try to meet the same need when multiple nodes/users are communicating over the network in a distributed environment and require access to the same data from various machines. Today, there is no one partitioned global address space but an in-memory or fast database that provides strong consistency.

Strong consistency in a distributed shared memory environment means that all access to the data by all processes and nodes is consistent. Ensuring this is not an easy task. One of TensorFlow’s distributed strategies implements a shared memory model; you will learn all about it and its pros and cons in Chapter 8.

Dedicated Distributed Computing Models

Dedicated distributed computing models are models that were developed to support a specific need in the machine learning development cycle. Often, they leverage the general-purpose models as building blocks to construct a more user-friendly framework that data science practitioners can use out of the box. You can make use of them with TensorFlow and PyTorch.

One example of a dedicated distributed computing model for machine learning workloads is a parameter server. TensorFlow implements this as part of its distribution strategy for training models. Parameter servers leverage the shared memory approach: you have a dedicated group of servers guaranteeing strong consistency of the data that serves the workers with consistent information. The parameters are the weights and precomputed features the machine learning algorithm requires during its training and retraining lifecycle. In some cases, the parameters can fit into one machine’s memory, but in real-life use cases where there are billions of parameters, having a cluster of parameter servers is a requirement. We will look at this in more detail when we discuss the different TensorFlow distributed computing strategies in Chapter 8.

With research and industry investing heavily in distributed machine learning, it’s only a matter of time until more models are developed. It’s always a good practice to keep an eye out for new developments. Hopefully, by the end of this book, you will have all the tools and information you need to make an educated decision about which of the various distributed computing models to use and how to leverage it to meet your business’s technological needs.

Now that you are familiar with these concepts, let’s take a look at the bigger picture of distributed machine learning architecture and where each of these concepts comes into play.

Introduction to Distributed Systems Architecture

We’ll begin with a brief discussion of network topologies. Topologies are how we organize computers to form distributed systems. We can divide them into two types: physical topologies that describe how the computers are arranged and connected, and logical topologies that describe the way data flows in the system and how the computers exchange information over the network. Multinode computer topologies are typically scaled physically by adding more computers (aka nodes).

Engineers often discuss topologies in the final architecture conversation. The architectural requirements stem from the project’s goals, the data, the system’s behavior, and the existing software tools that are in use. For data scientists, the main goal is to define the distributed model training methods and how the models will be deployed and served.

Note

In some cases, you might identify a gap where the available software tools are not addressing the project’s needs well enough or are too complex to integrate into your solution, and you’ll need to source or develop new tools. This is an advanced scenario that we won’t be addressing in this book.

The nodes that form the topology of a distributed system are connected through a network in a specific architectural pattern designed to improve the load-handling capability and optimize speed and resource use. The architectural choices that are made in the design phase will impact each node’s role in the topology, how they communicate, and the overall system’s resilience to failure.

As well as understanding the physical topology of a distributed system, you should be aware of the differences between centralized and decentralized systems, how the machines interact, the modes of communication that are supported, and how the system handles security and failures. You can think of these as building blocks as you design your system.

Centralized Versus Decentralized Systems

In a centralized system, all the nodes depend on a single node to make decisions. Such a system benefits from greater control over decisions yet is more prone to failures as the decision-making node becomes a single point of failure that can bring the whole system down.

In a decentralized system topology, the nodes are independent and make their own decisions. Each node stores and operates on its own data, so there is no single point of failure. This means the system is more tolerant of faults; however, it also means that the decisions made by the individual nodes need to be coordinated and reconciled.

Decentralized systems can benefit from a multicloud/hybrid cloud architecture, where machine nodes reside in different regions and with different cloud providers. An example is a network of connected Internet of Things (IoT) devices: each device is independent but shares data over the network with other devices and/or the cloud, depending on its internet connectivity. The topology you choose will affect the communication methods the devices can use and their possible roles within the network. When it comes to training models, a decentralized approach means that every model is being trained on its own. We’ll talk more about the implications of this design decision later in this chapter, when we look at ensemble methods.

Interaction Models

The architecture of the interaction model defines how the nodes in a system communicate over the network, what their roles are in the system, and what responsibilities come along with those roles. We’ll look at three possible architectures in this section: client/server, peer-to-peer, and geo-distributed. There are other architectures in use and in development that are not covered here, but these are the ones you are most likely to encounter.

Client/server

In the client/server interaction model, there is a clear definition of responsibilities. Tasks are divided between clients, which issue requests, and servers, which provide the responses to those requests. The role of a node can change subject to the structure and needs of the system, but this depends on whether the servers are stateless (storing no state) or stateful (storing state that the next operations are based on).

Peer-to-peer

In the peer-to-peer (P2P) interaction model, the workload is partitioned between the nodes, or peers. All nodes have equal privileges and can share information directly without relying on a dedicated central server. Every peer can be both a client and a server. This topology is more permissive and cheaper to implement, as there is no need to bind a machine to a specific responsibility. However, it does have some downsides: each node needs to have a full copy of the data, and because all data is exchanged over the network without a dedicated coordinator, multiple copies can reach the same node.

Geo-distributed

The geo-distributed interaction model is most commonly seen in geo-distributed cloud data centers. It aims to solve challenges related to issues such as data privacy and resource allocation. One concern is that the latency of peer-to-peer communication in the geo-distributed model may be high, depending on the distance between the nodes. Therefore, when developing distributed machine learning workloads based on this interaction model, we need to provide a clear definition of how the nodes communicate and in which cases. An example of when the geo-distributed interaction model is a good choice is to enable federated learning with IoT/edge devices, where data cannot be centralized in one data center. Developing a system to train a model on each device across multiple decentralized nodes and assemble the output to create one cohesive model allows us to benefit from the data insights of all the devices, without exchanging private information.

Communication in a Distributed Setting

How our nodes communicate in a distributed environment has a significant impact on failure mechanisms, security, and throughput. Communication can be synchronous or asynchronous, depending on the needs of the distributed computation model. For example, a parameter server (a dedicated machine learning computing model mentioned earlier in this chapter) can be implemented with asynchronous or synchronous communications, and TensorFlow supports both synchronous and asynchronous training for distributing training with data parallelism.

Distributing machine learning workloads across more than one machine requires partitioning the data and/or the program itself, to divide the workload evenly across all machines. The decision of whether to use asynchronous or synchronous communications between the machines affects compute time and can lead to bottlenecks. For instance, shuffling data over the network can improve accuracy and help reduce overfitting. However, shuffling often involves writing data to the local disk before sending it over the network; this results in more input/output (I/O) operations, increasing the overall computation time and creating a bottleneck on the local disk, as well as a large amount of communication overhead. For such tasks, you need to examine the communication approach you take carefully.

Asynchronous

The underlying mechanism for asynchronous communication is a queue. Requests to a given node are placed in a queue to be executed and can eventually return a result or not. This mechanism is useful in systems where information exchange is not dependent on time, as there is no need to receive a response right away. You can think of it like a text message: you send a text to your friend asking about dinner plans for next Saturday, knowing that you will likely eventually get a response, but you don’t need it right away. Asynchronous communication allows for a flow of messages in a distributed system without blocking any processes while waiting for a reply; it’s generally preferred when possible.

Synchronous

The requirement for synchronous communication arises from the computer science function stack, where functions must be executed in a specific order—meaning that if a node sends a request to another node, it can’t continue processing further function logic while waiting for the response. You may use synchronous communication for specific distributed machine learning cases and leverage dedicated hardware like special network cables when necessary (some cloud vendors enable you to configure the network bandwidth2). Suppose you want to make dinner plans with your friend for tonight. Your actions will depend on your friend’s food preferences and availability and your chosen restaurant’s availability. You know that if you decide on a popular restaurant and don’t book a table now, there won’t be any space left. What do you do? Rather than sending a text, you pick up the phone and call your friend to collect the information synchronously; both of you are now blocked as you talk on the phone. You get the necessary information and continue to the following stage, which is contacting the restaurant.

Now that you have an idea of some of the main architectural considerations in a distributed machine learning topology, let’s take a look at a technique that has been gaining in popularity in machine learning applications in recent years: ensemble learning.

Introduction to Ensemble Methods

Ensemble machine learning methods use multiple machine learning algorithms to produce a single model with better performance and less bias and variance than the individual algorithms could achieve on their own. Ensemble methods are often designed around supervised learning and require a clear definition of model aggregation that can be used during prediction time. We’ll begin by exploring why these methods are useful, then look at the main types of ensemble methods in machine learning and some specific examples.

High Versus Low Bias

Bias is a major problem in machine learning, and reducing bias is one of the machine learning engineer’s main goals. A model with high bias makes too many assumptions about the results, leading to overfitting to the training data. Such a model tends to have difficulty making accurate predictions about new data that doesn’t exactly conform to the data it has already seen and will perform badly on test data and in production. Conversely, a model with low bias incorporates fewer assumptions about the data. Taken to an extreme, this can also be problematic as it can result in underfitting, where the model fails to learn enough about the data to classify it accurately. Models with high bias tend to have low variance, and vice versa. You can think of variance as the ability of a machine learning algorithm to deal with fluctuations in the data.3

Often, bias can also derive from the machine learning algorithm itself. For example, linear regression is a simple algorithm that learns fast but frequently has high bias, especially when used to model the relationship between two variables when no real linear (or close to linear) correlation exists between them. It all depends on the underlying relationship of the features.

Types of Ensemble Methods

In many cases, ensemble methods turn out to be more accurate than single models; by combining the individual predictions of all the component models, they are able to produce more robust results. In ensembles, each model is called a learner. We define the relationship between the learners based on the desired goal.

When we want to reduce variance, we often build dependencies between the learners by training them in a sequential manner. For example, we might train one decision tree at a time, with each new tree trained to correct the mistakes of the previous trees in the series. This strategy of building multiple learners with the goal of reducing previous learners’ mistakes is known as boosting. The ensemble model makes its final predictions by weighting the votes, calculating the majority vote, or calculating an overall sum that acts as the prediction or classification. An example is the gradient-boosted trees classifier (GBTClassifier) implemented in Spark MLlib. This is an ensemble technique to iteratively combine decision trees using a deterministic averaging process; the algorithm’s goal is to minimize the information lost in training/mistakes (more on this in Chapter 5). It’s important to be aware, however, that the iterative combination of the trees can sometimes lead to overfitting.

To avoid  overfitting, we might prefer training the learners independently in parallel and combining their predictions using bagging or stacking. With the bagging technique (short for bootstrap aggregation), we train each learner (typically all using the same machine learning algorithm) on a different part of the dataset, with the goal of reducing variance and overfitting and improving the accuracy of predictions on previously unseen data. The outcome of this ensemble method is a combined model where each learner makes its prediction independently and the algorithm collects all the votes and produces a final prediction. An example is the Random​Forest​Classi⁠fier implemented in Spark MLlib (random forest is an ensemble technique to combine independent decision trees).

Stacking is similar to bagging, in that it involves building a set of independent learners and combining their predictions using an ensemble function that takes the output from all the learners and reduces it into a single score. However, with stacking, the learners are usually of different types, rather than all using the same learning algorithm, which means they make different assumptions and are less likely to make the same kinds of errors. You can use any type of machine learning model as the combiner that aggregates the predictions. A linear model is often used, but it can be nonlinear, taking in the learners’ scores together with the given data—for example, a neural network where the base learners are decision trees. This approach is more advanced and can help uncover deeper relationships between the variables.

Tip

Ensemble methods are said to be homogeneous when the learners have the same base learning algorithm. Ensembles whose learners have different base learning algorithms are referred to as heterogeneous ensembles. Boosting and bagging are considered homogeneous ensemble methods, while stacking is a heterogeneous method.

Distributed Training Topologies

You can leverage cluster topologies to improve training and serving of ensemble models. Let’s take a look at a couple of examples.

Centralized ensemble learning

Centralized systems often use a client/server architecture, where client nodes are communicating directly with centralized server nodes. It resembles a star topology in computer networks. In a distributed machine learning deployment approach, this means that all the requests for predictions, classifications, etc., from the distributed models go through the main servers.

Whether there’s one server node or several that act as the final decision makers, there is a strict hierarchical logic for aggregation at the server level that happens in a centralized location. This topology is dedicated specifically to a distributed model workload and is not general-purpose. For example, consider the random forest ensemble learning method. RandomForest is a bagging algorithm that can be used for classification or regression, depending on the nature of the data, and aims to mitigate overfitting, as described earlier. A random forest consists of a collection of decision trees. When you decide to leverage RandomForest as your algorithm, the program making the queries will interact as a client interacting with the main server nodes. Those server nodes will send the queries to the tree nodes, collect answers (the output of the model) from the trees, aggregate the output based on ensemble logic, and return the answers to the client. The individual trees in the ensemble may be trained on completely different or overlapping datasets.

Decentralized decision trees

Decision trees can be deployed in a decentralized topology as well. You can use this approach when you want to provide answers on edge devices and are constrained by data privacy concerns, internet bandwidth, and strict time requirements for the responses. Decentralized decision trees are useful for Edge AI, where the algorithm is processed and the model is served locally on the device. Each node does not need to be permanently connected to the network, though it can leverage the network to improve the accuracy of its predictions and avoid overfitting. In this case, when an edge node receives a query requesting it to make a prediction, it sends the query to its parent and child nodes, which in turn send the query to their parents and children, and each node calculates and broadcasts its response. Each node will have its own aggregation function and can decide whether or not to use it based on whether its response is available within the specified time constraints. To keep communication overhead to a minimum, you can limit the number of “hops,” to define how far a query can travel. This constraint enforces a node neighborhood. A node’s neighborhood can change based on network and edge device availability.

Centralized, distributed training with parameter servers

In a centralized distributed training topology, the entire workload is processed in one data center. The machines are well connected and communicate over a shared network. The dataset and the training workload are spread out among the client nodes, and the server nodes maintain globally shared parameters. The server nodes act as parameter servers that all the client nodes share access to and consume information from—a global shared memory. They have to have fast access to the information and often leverage in-memory data structures. One family of machine learning algorithms that can leverage this topology is deep learning algorithms. With this approach, the parameters are broadcasted and replicated across all the machines, and each client node separately calculates its own part of the main function. The variables are created on the parameter servers and shared and updated by the client or worker nodes in each step.

Chapter 8 discusses this strategy in detail, with code examples illustrating making use of this topology with TensorFlow.

Centralized, distributed training in a P2P topology

In a peer-to-peer topology, there are no client and server roles. All nodes can communicate with all of the other nodes, and each node has its own copy of the parameters. This is useful for leveraging data parallelism when there are a fixed number of parameters that can fit into the nodes’ memory, the logic itself does not change, and the nodes can share their outcomes in a peer-to-peer manner. Gossip learning is one example of a method that uses this approach. Each node computes its model based on the dataset available to it and performs independent calls to its peers on the network to share its model with them. The nodes then each combine their current model with their neighbors’ models. As with decision trees in a decentralized deployment environment, this topology should be restricted with time constraints, and the maximum number of edges each node will broadcast the information to should be defined. With the P2P topology, you might also want to specify a protocol like MPI to standardize the workload.

The Challenges of Distributed Machine Learning Systems

Rome was not built in a day, but they were laying bricks every hour.

—John Heywood

Although you’re just beginning your journey into distributed machine learning, it’s important to be aware of some of the challenges that lie ahead. Working with distributed machine learning is significantly different from developing machine learning workloads that will run on one machine, and ultimately it’s your responsibility to build a system that meets the defined requirements. However, experienced practitioners will tell you that all requirements are negotiable and what can fail will fail. Both of these are true and should be kept in mind as you weigh your efforts throughout the process.

Performance

Improving performance is the fundamental goal of implementing a distributed system. Achieving higher throughput and performing end-to-end machine learning computations faster are critical justifications for distributed machine learning systems. There are many approaches you can take to improve performance, depending on your goal, your data, and the behavior of your system. Let’s take a look at some of the things you should consider and some problems you will likely face.

Data parallelism versus model parallelism

In computer science, distributed computing often goes hand in hand with parallel computing. Parallel computing on a single node/computer means using that single node’s multiple processors to perform various tasks at the same time. This is also called task-parallel processing. In comparison, in the context of distributed computing, parallel computing refers to using numerous nodes to perform tasks, with each node operating in parallel. When discussing distributed computation, parallel computation is a given and won’t be mentioned explicitly.

One of the most significant sources of confusion when approaching distributed machine learning is the lack of a clear understanding of what precisely is distributed across the nodes. In the machine learning workflow/lifecycle, you preprocess your data, perform feature engineering to extract relevant features, enrich the data, and finally ingest it into your machine learning algorithm along with a set of hyperparameters (parameters whose values are used to control the learning process, also known as the machine learning algorithm’s tuning parameters). During the learning process, the values/data ingested are affected by the hyperparameters, so it is recommended that you try out a wide range of hyperparameters to ensure that you identify the best possible model to use in production.

Dealing with a large set of data and a large set of tuning parameters raises the issue of how to manage your resources and the training process efficiently. Generally speaking, there are two approaches to training machine learning algorithms at scale. You can have the same algorithm with the same hyperparameters duplicated across all the nodes, with every machine running the same logic on its own piece of the data. Conversely, you can have each node running a different part of the algorithm, on the same set of data. Figure 1-3 illustrates the difference between these approaches: with data parallelism, the data is split into shards or partitions, and those partitions are distributed among the nodes, while with model parallelism, the model itself is split into pieces and distributed across machines.

Figure 1-3. Data parallelism and model parallelism

With data parallelism, each node runs the same computation logic, which means that code must also be distributed across the nodes. From node to node, the data input changes, but all the nodes run the same code, as illustrated in Figure 1-4.

Figure 1-4. Data parallelism: the same logic is distributed to all the machines, and each machine runs the logic with its local data

With model parallelism, multiple nodes each execute different pieces of the machine learning algorithm, and the distributed outputs are then assembled to produce the model itself. This approach is suitable for algorithms that can be parallelized by representing them in a directed acyclic graph, where the vertices represent the computations and the edges represent the data flow.

One challenge in serving an existing model is that sometimes the model itself can’t fit into one machine’s memory and requires adjustments to be served optimally. Think about the random forest ensemble method. It may be that the whole forest can fit into a single machine’s memory, but what if the forest has billions of trees? One option (which requires dedicated tools and hardware) is to divide the model into subsets, placing each one on a different machine, and have the machines communicate in a well-defined manner to serve the model efficiently. This approach is often employed by deep learning frameworks such as PyTorch and TensorFlow.

Combining data parallelism and model parallelism

Combining data and model parallelism is not at all straightforward, due to the nature of the existing open source tools and the complexity of building a dedicated system that will leverage both. However, whereas in the past we had to choose between data parallelism tools such as Apache Spark and model parallelism tools such as PyTorch, today many of these tools support each other, either natively or through extensions like Petastorm, Horovod, and others.

The need to combine both types of parallelism can have a significant effect on how long it takes to produce a new model, serve it, and start using it for prediction. For example, GPT-3 (Generative Pre-trained Transformer 3), a model developed by OpenAI, uses deep learning to produce human-like text. At its maximum capacity of 175 billion parameters, it is estimated that it would take 355 years and cost $4.6 million to train this model with a Tesla v100 GPU. For most companies, even with a far smaller number of parameters, this is an expensive and extremely slow option. Not only that, but it takes multiple tries to find suitable hyperparameters that will yield accurate results. We won’t discuss this GPT-3 further in the book, but it is important to know that it exists.

Deep learning

Deep learning algorithms pose a particular challenge for distributed machine learning performance. Deep learning is based on an artificial neural network (ANN) with feature learning, which means that the system discovers the features automatically from the raw data. Training a deep learning model requires forward computation and/or backward propagation. Forward computation is the act of feeding the data forward in the neural network (NN) to calculate the outcome. Backward propagation, or backpropagation, is the act of feeding the “loss” of accuracy backward into the neural network through the layers in order to understand how much of that loss every node is responsible for, and updating the weights of the neural network layers accordingly. To simplify it, you can think of it as feeding inaccuracies back into the model to fix them.

Both forward computation and backpropagation inherently require sequential computation. Each layer or stage must wait for input from the former stage. While we can distribute each stage on its own, the model training as a whole is sequential. As a result, we still need orchestration to enforce a sequence or some form of an automated pipeline during the training. One scheduling algorithm that enables us to run distributed deep learning workloads is gang scheduling. Based on this algorithm, the community introduced barrier execution mode in Apache Spark 2.4, which allows us to create groups of machines that work together on a stage and proceed to the next stage only when they have all finished. Barrier execution mode is part of Project Hydrogen, which aims to enable a greater variety of distributed machine learning workloads on the general-purpose Apache Spark framework.

Resource Management

Deciding how to split cluster resources is one of the biggest challenges in a distributed system. When you add in distributed machine learning workloads as well, it gets even more complicated. The reason for that is the need to improve performance by pairing the software with dedicated hardware. And it’s not only the GPU versus CPU conversation—today, Intel, NVIDIA, Google, and other companies are producing machines that are built with dedicated hardware chips for AI. These AI accelerators are built for high-performance massive parallel computation that goes beyond the traditional threaded algorithms. What’s more, many machine learning algorithms are still evolving. This is why Microsoft introduced field-programmable gate array (FPGA) chips to its cloud, as part of Project Catapult to enable fast real-time AI serving of deep learning. FPGAs have a reconfigurable design, which makes it easier to adjust the hardware as needed after software updates.

Resource sharing is also a challenge in a distributed environment, when there are competing workloads. When there is a need for 10 machines but only 5 are available, the software can either use what it has or wait for more machines to become available. This creates a bottleneck and can result in great pain. Think about a scenario where you are running machine learning training in a production environment to save on resources, and your training workloads compete for resources with your product’s real-time workloads. You might find yourself in trouble with customers at that point. This is why it’s best to have multiple environments for critical workloads and long-term/offline workloads. But what if your model is critical, and training it now on fresh data might allow you to discover an unforeseen real-time trend in the industry (and missing out on this might result in revenue loss)? You might need to maintain two environments for critical workloads, though that can be costly and results in a low return on investment (ROI).

Reconfiguring and resource sharing are only part of the story. Another challenge is automating the decision of when to use GPUs versus CPUs or FPGAs and other hardware options that are available in the market. With the cloud and a sufficient budget, we can get all the hardware we need, but again, we have to think about ROI. What should we do? How can we automate this decision? There is as yet no definitive answer to that question, but the happy news is that more and more software and hardware solutions are introducing support for one another. For example, NVIDIA created RAPIDS, which is a suite of open source libraries layered on top of NVIDIA CUDA processors. CUDA enables GPU acceleration of data science processes. With RAPIDS support for Apache Spark 3.0 accelerating not only data science workloads but also ETL/data preparation, we can potentially build a cluster based on it that will power both data preparation and model training and serving, eliminating the need to automate a switch in resources (although the question of ROI remains).

Fault Tolerance

Fault tolerance is what allows a distributed system to continue operating properly in case of a failure. In distributed machine learning, failures can take two forms:

  • Typical failure of a machine that can be detected and mitigated

  • Undetected failure of a machine that produced bad output

Let’s start with the first one. To better understand the need for a fault-tolerant procedure, ask yourself: If we distribute our workload to a cluster of 1,000 computational nodes, what will happen if one of those nodes crashes? Is there a way to fix it other than just restarting the job from the very beginning?

When one of the stages fails, do we need to recompute everything? The answer is no. Today, many distributed computation frameworks have a built-in procedure for fault tolerance: they achieve it by replicating the data and writing information to disk between stages for faster recovery. Other frameworks leave defining this mechanism up to us. For example, in TensorFlow with synchronous training, if one of the workers fails and we didn’t provide a fault tolerance procedure, the whole cluster will fail. This is why, when deciding on the TensorFlow distribution strategy, we need to pay attention to the fault tolerance mechanism. On the other hand, Apache Spark does not expose this decision to us. Rather, it has a built-in hidden mechanism that we cannot tweak from the machine learning API itself. Sticking with the automatic data-parallel workload fault tolerance mechanism in Spark saves us a lot of time by eliminating the need to think through the possible failure cases and solutions.

The second type of failure is specific to distributed machine learning, as it directly impacts the performance of the machine learning algorithm itself. In this case, we can look at it as if we have a Byzantine adversary machine or an intentionally or unintentionally faulty agent. Faulty agents (or adversaries) can harm the machine learning model’s performance by exposing erroneous data. It’s hard to mitigate such behavior, and the effects are highly dependent on the algorithm we use. Detecting such failures is one reason why it’s important to monitor machine learning models, as discussed in Chapter 10.

Privacy

Discussions of privacy in machine learning generally focus on protecting data collected from users/customers or protecting the model and parameters themselves. The model and its parameters can be the company’s intellectual property, and it might be important to keep them private (for example, in a financial market system).

One option for enforcing data privacy is avoiding the restrictions of centralizing data. That is, we want to build a model without uploading the members’ training data to centralized servers. To do that, we can leverage techniques like federated learning. With this approach, we train the algorithm on the edge devices, with each device using its own data. The devices then each exchange a summary of the model they built with other devices or a dedicated server. However, this approach is not foolproof. An adversarial attack can happen during the training itself, with the attacker getting hold of some or all of the data or the results of the training. This can easily occur during federated learning when the attacker takes an active part in the training process through their edge devices.

Let’s assume we have found a way to securely centralize the data or train the models without doing this. There is still a chance that a malicious actor can recover information about the data (statistics about a specific population, classification classes, and more) that we used to train the model by interacting with the model itself. As you can probably tell by now, there is no one-size-fits-all solution when it comes to ensuring privacy in machine learning—it requires dedicated technologies and architectures. Although privacy in distributed machine learning is a fascinating topic, it’s also a big one, and we won’t discuss it further in this book.

Portability

Portability ties back to the general challenges of a distributed system. When we add dedicated computing hardware such as multiple types of GPUs paired with the software we build, it makes moving the workloads from one cluster to another more difficult. In the early days of the cloud, many companies used the “lift and shift” migration strategy, moving their workloads and applications to the cloud without redesigning them. However, in many cases this resulted in higher costs since they didn’t take advantage of the environment’s features. Specifically, in the cloud, native features were built with optimizations for specific workloads. The same happens with a distributed machine learning approach: the various types of hardware available, the requirements, and the need to improve ROI by reducing development, storage, and compute costs can impact portability.

There are other challenges that are inherent to distributed systems that don’t relate directly to machine learning but can still have an impact on machine learning workloads, like trust or zero trust systems, network overhead, ensuring consistency, and more. We won’t cover those issues in this book, but you should consider them when designing your product strategy.

Setting Up Your Local Environment

Now that you understand the landscape better, let’s set you up for success! Many of the code samples in this book are available for you in the book’s GitHub repository. To gain hands-on experience with them, you should set up a learning environment, which you can run locally on your machine.

You’ll want to go through this setup process twice, first for the tutorials in Chapters 26 and then for the tutorials in Chapters 710.

Chapters 2–6 Tutorials Environment

To follow along with the tutorials in Chapters 2 to 6, make sure you have the latest version of Docker installed on your machine and follow these steps:

  1. Run Docker.

  2. In a terminal window/command line, run the following command:

    $ docker run -it -p 8888:8888 adipolak/ml-with-apache-spark

    This pulls an image of a PySpark Jupyter notebook with Apache Spark 3.1.1, which includes most of the libraries we’ll use. You will learn how to add the rest later. After executing this command, you will get a response like this:

    [I 13:50:03.885 NotebookApp] Serving notebooks from local directory: 
    /home/jovyan
    [I 13:50:03.885 NotebookApp] Jupyter Notebook 6.3.0 is running at:
    [I 13:50:03.885 NotebookApp] http://6cb805089793:8888/?token=e14171684af
    c305b4702cbda99ae879d3faecf5db6bea37d
    [I 13:50:03.885 NotebookApp] or http://127.0.0.1:8888/?token=e14171684af
    c305b4702cbda99ae879d3faecf5db6bea37d
    [I 13:50:03.885 NotebookApp] Use Control-C to stop this server and shut 
    down all kernels (twice to skip confirmation).
    [C 13:50:03.891 NotebookApp]
    
        To access the notebook, open this file in a browser:
            file:///home/jovyan/.local/share/jupyter/runtime/nbserver-8-open
    		.html
        Or copy and paste one of these URLs:
            http://6cb805089793:8888/?token=e14171684afc305b4702cbda99ae879d
    		3faecf5db6bea37d
         or http://127.0.0.1:8888/?token=e14171684afc305b4702cbda99ae879d3fa
    	 ecf5db6bea37d
    ^C[I 13:50:27.037 NotebookApp] interrupted
    Serving notebooks from local directory: /home/jovyan
    0 active kernels
    Jupyter Notebook 6.3.0 is running at:
    http://6cb805089793:8888/?token=e14171684afc305b4702cbda99ae879d3faecf5d
    b6bea37d
     or
    http://127.0.0.1:8888/?token=e14171684afc305b4702cbda99ae879d3faecf5db6b
    ea37d
    Tip

    Getting an error about AMD? Use this command instead:

    $ docker run -p 8888:8888 \ 
    	adipolak/amd-ml-with-apache-spark
  3. Copy the last URL with the token parameter. It will look something like this, but you will have your own token:

    http://127.0.0.1:8888/?token=43143a485357351ef522a1840f8c8c141a1be2bcf5f
    9b4de

    Paste it in your browser. This will be your Jupyter tutorial environment.

  4. Clone or download the book’s repo.

  5. Extract (unzip) the files and upload the notebooks and data files into Jupyter using the Upload button (see Figure 1-5).

Figure 1-5. Jupyter Upload button

The pyspark-notebook Docker image is very simple at this stage: it simply contains Jupyter and PySpark, the main tools used in this book. Figure 1-6 shows how the images in Jupyter Docker Stacks are stacked.

Figure 1-6. Jupyter Docker Stacks images

Chapters 7–10 Tutorials Environment

The tutorials in Chapters 710 require PySpark, PyTorch, Petastorm, TensorFlow, and everything related to the build, deploy, and serve parts of the machine learning lifecycle. You can follow the same steps outlined in the previous section to set up an environment for these chapters. In addition, to work with PyTorch, you’ll need to install it directly from the Jupyter terminal. Figure 1-7 shows you how to do this using the following conda command on macOS (note that during this process, you may be prompted to answer some questions about the installation):

$ conda install pytorch==1.12.1 torchvision==0.13.1 -c pytorch
Figure 1-7. Installing PyTorch in your environment

You may need additional resources for the tutorials in these chapters—for example, you might need more RAM for faster execution. To configure that, you can leverage the docker command with the --memory and --memory-swap tags. Make sure to define the amounts according to your machine’s capabilities:

$ sudo docker run -it --memory="16g" --memory-swap="24g" -p 8888:8888 \ 
	adipolak/amd-ml-with-apache-spark

Summary

In this chapter, we covered the very basics of distributed machine learning, touching lightly on a number of complex topics: machine learning workflows, distributed computing models, network topologies, distributed training and serving, and much more. As you know, Apache Spark supports parallel data processing across clusters or computer processors in real time. The framework is based on the MapReduce paradigm, which has numerous advantages when it comes to processing data, analytics, and machine learning algorithms. However, it also has some limitations, particularly with regard to deep learning workloads (as a result, I’ll show you how to bridge from Spark into deep learning frameworks in Chapter 7).

The next chapter provides a quick introduction to PySpark, to get you up to speed or help you brush up on the basics. Chapter 3 will get you started with machine learning lifecycle management using MLflow and show you how to package your experiments so that you can follow along with the tutorials in the rest of the book. You will learn how to utilize PySpark for your machine learning needs in Chapters 4, 5, and 6.

1 Depending on how you define it, this era of computer science history began in either 2012 or 2016—the amount of digital data estimated to exist in the world exceeded 1 zettabyte in 2012, and Cisco Systems announced that global IP traffic hit 1.2 zettabytes in 2016.

2 The bandwidth indicates how much data can be transmitted over a network connection in a given amount of time over a wired or wireless connection.

3 If the variance of the model is low, the sampled data will be close to what the model predicted. If the variance is high, the model will perform well on the training data but not on new data.

Get Scaling Machine Learning with Spark now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.