One of the classic challenges of analytics is making data accessible to all. The data needed for analytics and data science is often locked away in different data silos, where it is difficult to discover and access. Analysts and data scientists who want to derive new insights from data within the enterprise must work with a large number of data stewards and data engineers to build their own map of data assets and source data from various data silos.
As a result of our move at FINRA to a managed data lake architecture in the cloud, our organization arrived at a solution to this problem, as well as introducing significant improvements in the flexibility of our data processing pipeline that prepares data for analytics. In this process, I’ll describe our approach.
A challenge of big data
FINRA is the Financial Industry Regulatory Authority, a not-for-profit organization authorized by Congress to protect America’s investors by making sure the broker-dealer industry operates fairly and honestly. FINRA’s Market Regulation group monitors 99% of all equity trades and 70% of all option trades in the U.S. This is done by processing billions of records a day of trade data from brokerage firms and exchanges. The data is validated, transformed, and prepared for analytic use. Once the data is ready for analytics, hundreds of automated detection models are run against the data to look for indicators of potential market manipulation, insider trading, and abuse—generating exception alerts when a pattern is matched. From there, regulatory analysts interactively delve deeper into the data to determine whether a regulatory issue exists. To stay abreast of emerging regulatory problems and develop new detection algorithms, a team of data scientists continually explores the data and develops new detection models.
To process and operate at these volumes, FINRA made early investments over a decade ago in cutting-edge, emerging data-warehouse appliances. This required a significant initial investment along with subsequent re-investments to expand capacity. Despite these investments, we still faced continual capacity challenges. Additionally, these appliances were complex to manage and operate in a dynamic business environment. Market volumes can fluctuate significantly day-to-day—sometimes by a factor of three or more. Regardless of fluctuations, FINRA must run its validation, ETL, and detection algorithms on the data within time frames specified in our service level agreements (SLAs). Investing in the capacity to meet anticipated peaks was cost prohibitive—it was not practical to dedicate constant capacity for processing the peak. As a result, when unanticipated peaks in market volume occurred, technology and business staff had to scramble to re-prioritize other business workloads in order to process the additional peak. This required additional operational support (people), monitoring 24x7 to allow intervention at the first sign of trouble.
A Hadoop alternative?
Facing these challenges, FINRA reviewed the options available at the time (2013). One emerging concept was the use of a large Hadoop cluster to create a “data lake” using the cluster’s HDFS for storage that could be a single, ever-growing store of data, combined with the processing power to query it. On the data lake, a variety of tools (at the time, Hive, Pig, and custom MapReduce) could be run against the data on this cluster. With the concept of “schema-on-read,” a schema could be applied to data at time of query, instead of time of ingest. This change allowed a variety of tools to be used for processing data—the right tool for the right job. Additionally, storage and compute capacity could be added in smaller increments using commodity hardware. This process was much better than using the data warehouse appliance, which required a major capital expenditure every few years to upgrade capacity, along with the operation complexity of moving data during upgrades and expansions.
While a Hadoop data lake appeared better than the previous state, there still remained three fundamental challenges:
- Capacity was still not truly dynamic—While it was (relatively) easy to add capacity, this was still a process that could take days (or weeks to months if additional servers needed to be procured). Additionally, while it was possible to add capacity for peaks, it was not possible to shrink capacity once the peak had passed. We were still faced with the challenge of sizing for anticipated peak demand.
- Capacity could not be optimized to balance storage and compute—The second challenge was that by continuing the use of a paradigm where storage and compute capacity were combined together at the node (server) level, we often had idle processing capacity that did not match our storage needs.
- Managing a Hadoop cluster can be complex—FINRA already had experience with running Hadoop clusters in a production environment. We knew that running a multi-petabyte Hadoop cluster would be operationally complex. Operations staff experienced with Hadoop needed to continually monitor and maintain the cluster: replacing faulty hardware before it resulted in losing data, adding capacity, and managing backups. All these tasks add cost and complexity.
So, we started looking for an approach that would give us the benefits of a data lake without the operational complexity and cost. Cloud offered the promise of pay-as-you-go resources that could be added or removed as needed based on workload. Could this address the problem of matching capacity to demand?
The cloud data lake—beyond traditional Hadoop
Eventually, we arrived at an approach of preserving the key data lake concepts: a single repository of the data, the ability to use multiple tools, “schema-on-read,” and the ability to secure the data centrally but with reduced operational complexity and cost by leveraging cloud services. This approach was accomplished by separating the key concepts of a database—catalog, query/compute engine, and storage—and projecting them onto separate cloud services:
Storage—Using an object store (e.g., AWS Simple Storage Service—S3) provides a way to store and manage data with minimal operational effort. Features that are often difficult to perform well, such as capacity management, capacity forecasting, access control, encryption, disaster recovery and archiving, are all easy to configure and automate when abstracted as a service. Adding data to our repository is as simple as storing it in S3, applying the appropriate access controls, and registering with our centralized catalog (next).
Catalog—A catalog that can store technical metadata (like DDL) is essential to a database. Additionally, it would be ideal for a catalog to also be able to contain business and operational metadata (such as lineage). We added the additional objective of a catalog that was not bound to a particular operating stack (e.g., Hadoop). Ideally, it would be a catalog that could reference data sets in other stores. After surveying the market options available at the time (early 2013), we did not find any suitable open source or commercial offering. So, we developed our own catalog, which can store technical metadata that is needed to support querying and data fixes. In addition, it features a UI that allows data scientists and other consumers to explore the data sets, layering in business metadata on top of the technical metadata. Portions of this data catalog have been open-sourced as the herd project.
Query/Compute—Having abstracted away the problem of storage management and catalog, we were left with the challenge of how to handle processing. AWS’s Elastic MapReduce (EMR) service allowed us to create and destroy Hadoop processing clusters as needed—all via simple API commands. On the cluster, we could load software (Hive originally, now Presto and Spark) that allowed our developers and analysts to use the tool they were familiar with (SQL) to access data. EMR supports the ability to run queries from these tools while keeping the data on the object storage layer (S3). This created what was effectively a single, multi-petabyte database. SQL queries can be done from either a single compute cluster or multiple clusters running in parallel, all reading against the same data set in S3.
As I am writing this post, we have more than 70 distinct compute clusters (over 2,000 servers in total) running simultaneously in our production environment. Some clusters are tuned for user queries. Others are running cost effective batch ETL and batch analytic routines. Being able to run multiple clusters simultaneously gives us effortless ability to parallelize workloads to meet unexpected business needs while still meeting routine SLAs. Also, while we use EMR for more than 90% of our processing, our platform-independent data catalog and separate storage layer allows us to use other services for specialized processing. We use the Lambda for highly parallelized, server-less file validation. We use the Redshift database service for certain query-intensive data marts. We have flexibility in the query/compute technology we use.
By re-thinking the analytic data warehouse as a series of services, we have been able to achieve significant operational benefits for our data processing. Now, when market volumes fluctuate, we can increase the capacity of our validation, ETL, and detection algorithms immediately to process the additional workload—all through a series of simple API commands. When the volume drops, we shrink the size of the processing clusters. In some cases, we can shrink them to zero if there is a lull in the incoming data stream. Because our data remains stored on S3, we leverage use of the AWS SPOT pricing market to purchase capacity at a fraction of the standard rate. If we lose a cluster due to SPOT market conditions (always a possibility), it is possible to easily re-provision the cluster and resume processing from the most recent checkpoint.
We can now easily buffer against unexpected changes in market volumes and still meet our SLAs. Additionally, it is very easy to accommodate unexpected business requests for out-of cycle processing. One condition we frequently encounter is the need to re-process data if a data error or problem is later detected in upstream data sources. These errors are outside our control and frequently require significant processing resources to accommodate. Executing them on our old data appliance infrastructure meant that they would have to wait weeks for a processing window when they could be run without interrupting processing of our normal data flows. With the managed data lake architecture, it is very easy to create additional processing clusters on EMR to execute the reprocessing at the same time the normal processing is occurring. We regularly create and destroy more than 10,000 nodes (servers) of compute capacity daily. All this allows us to complete reprocessing in a few days—not weeks.
Making access easier
While we had hoped to achieve operational benefits by moving to the new architecture, we were surprised that the approach also made our entire data set much more accessible to users such as data scientists who needed to explore the data to develop new models and insights.
Historically, it was difficult for our data scientists to explore the scope of data sets that were available and then actually obtain the data of interest for use in model development. They might need to work with multiple data engineers who were responsible for different sets of data—both to understand the data and to obtain access to the physical database where it resides. Often, the needed data had been archived to tape stored offsite, which required a time-consuming restore process. This could take weeks. Additionally, even after data sets were obtained, a data scientist might need to work with other engineers to provision a temporary data mart of sufficient capacity for them to perform their model development. This could mean weeks just to get started on model building. If there was a subsequent desire to incorporate additional data sources into the model development process, additional weeks could pass. Given the many course corrections and dead ends involved in model building, this severely limited the velocity for building new models and analytics using the data.
With the managed data lake architecture, our data scientists now have easy access to all of our data. Through the searchable interface of our data catalog, it is possible to explore all of our data sets based on business metadata to find data sets of potential interest. Also from within the catalog, it is possible to explore the technical metadata associated with a data set to learn how to query or extract it. However, the real power comes from the ability to integrate query tools with the catalog to go the next step and allow data scientist self-service access to all the data. Because all the data is stored on S3 and registered in the central catalog, a single query cluster running Presto or Spark can access all of our data. Further, it is very easy to quickly combine data sets from across the data lake and create temporary data marts for use with model development, all enabled via self-service.
FINRA’s entire Market Regulation application portfolio has been running in the cloud using the cloud data lake for more than a year. We have more than four petabytes (and growing) of catalog registered data accessible for query. Processing challenges that were difficult to solve are now easy, and we’ve reduced the friction our data scientists' experience—allowing them to focus on developing new detection models to stay ahead of emerging regulatory issues.
For organizations that are having challenges keeping up with growing demand for analytics or managing exploding data volumes, rethinking the data warehouse as a virtual database based on cloud services is worth considering. If you have questions about how FINRA moved its entire Market Regulation portfolio to the cloud and how we manage and operate our environment on a daily basis, feel free to reach out to me.