O'Reilly logo

AI and Analytics in Production by Ellen Friedman, Ted Dunning

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

Chapter 4. Example Data Platform: MapR

You probably didn’t wake up this morning thinking, “I’m gonna get me a global namespace!” But you likely do want the things it can make possible for your production systems. What makes this and other capabilities of a data platform interesting is knowing how they can help in getting systems into reliable production. This isn’t always obvious, especially as new technologies offer features that are just that, new. In this chapter, we describe one example technology: the MapR data platform. Most of the insights we’ve discussed so far, plus the design patterns we describe in the next chapter, are based on watching how people build successful production systems, and this is the platform they used.

Only a few of the design patterns we describe in Chapter 5 absolutely require capabilities unique to MapR, but all of them demonstrate good practice in pushing platform-appropriate functions down to the platform rather than trying to implement them at the application level. That’s an important general lesson regardless of the platform you use.

Understanding how the MapR data platform works will make it easier to see the key ideas in the design patterns. Once you see that, you’ll understand how to adapt them to your own needs. We also think you might find this technology interesting in its own right. Toward that goal, we use the first half of the current chapter to give you a grounding in some key capabilities of the MapR platform in the context of production deployments and then go on in the second half of this chapter to describe the underlying technology.

Note

Fundamental features of the MapR platform—files, tables, and streams—are all engineered together into a single technology, part of the same code and able to run on the same cluster instead of being separate systems that work together via connectors.

A First Look at MapR: Access, Global Namespace, and Multitenancy

One of the key distinctions of MapR’s data platform is that it provides a real-time, fully read-write filesystem. This capability means that you not only can interact with data stored on the cluster via Spark or Hadoop commands and applications, but you also can access the exact same data via more traditional methods and legacy applications. Any program in any language running on Linux or Windows system can access files in the MapR cluster using standard file input/output (I/O) mechanisms.

This broad compatibility with the MapR file system (MapR-FS) is possible primarily because MapR-FS allows access to files via a FUSE interface or via Network File System (NFS), both supporting a POSIX API. Additionally, files on a MapR system can also be accessed via the HDFS API and via the S3 API. This wide variety of access methods is very different from Hadoop Distributed File System (HDFS), the file system that Hadoop distributions use for distributed data storage, which supports only limited APIs and semantics. That’s part of the reason for the sense you get of a wall between data in a Hadoop cluster and whatever non-Hadoop processing or modeling you want to do.

What are the implications of MapR-FS being a read/write file system? One effect is that existing applications—so called legacy code—can access data (big or small) without needing to be rewritten using Spark or Hadoop. This interoperability eliminates copy steps and saves considerable time. Fewer steps helps make success in production more likely.

Another surprising characteristic of MapR-FS is that the file system is extended to include tables and streams on a par with files. You can access MapR streams by using the Apache Kafka API, and tables can be accessed using the Apache HBase API or the OJAI document database API, but the data is actually stored in the MapR file system, in the same system that stores file data. We talk in more detail about how that works in the second part of this chapter. The big idea is that having files, tables, and streams as objects in the same file system reduces complexity in infrastructure and architecture and often requires fewer servers because different services can share the same cluster. You can also use the same security and same administration for all three kinds of objects.

Equally surprising is that all of these data structures live in the same directories and share a global namespace, as you can see in Figure 4-1. Actually, we have known for decades that being able to name and organize files was a good thing that makes development easier and less error prone, so it makes sense that this is good for tables and streams. Directories and a global namespace simplify data management for administrators, as well.

A screenshot of a user listing the contents of a directory on a MapR system. This directory contains conventional files, streams, and a table, as well as other directories. Note that conventional Linux utilities are used here even though the data is stored on a highly distributed system.
Figure 4-1. A screenshot of a user listing the contents of a directory on a MapR system. This directory contains conventional files, streams, and a table, as well as other directories. Note that conventional Linux utilities are used here even though the data is stored on a highly distributed system.

With an HDFS cluster, you have directories for organizing files but no analogous way to organize tables or streams short of proliferating lots of small special purpose clusters. If you use Apache Kafka for stream transport you can assign data to topics, but you don’t have any good way to avoid topic name collision between applications.

You may also have noticed in Figure 4-1 that the path name is divided into a cluster name (the /mapr/se1 part) and a volume mount point part (the /user/tdunning part). MapR-FS has something called a volume, which is essentially a directory with specialized management capabilities that enable scalability, disaster recovery, and multitenancy capabilities. To users and applications, volumes appear to be ordinary directories. From an administrator’s point of view, however, a volume has a number of additional capabilities that include data locality, snapshots, permissions, and mirrors.

Volumes allow you to position data explicitly within a cluster using a feature known as data locality. You can use this to prevent performance degradation for particular data sets due to heavy I/O loading from rogue jobs (i.e., almost anything developers or data scientists do without proper supervision). You also can use data locality to ensure that data being consumed by high-performance hardware such as Graphics Processing Units (GPUs) is close to the GPUs or is stored on high-performance storage devices such as flash memory. Regulatory and compliance issues may motivate a need to physically isolate some data in a cluster onto certain machines without impairing the ability to use that data.

Volumes also allow exact point-in-time snapshots to be taken manually or via automated scheduling, a valuable capability to protect against human error or for data versioning. Entire volumes can be atomically mirrored to other clusters for testing, development or disaster recovery purposes. These are additional ways that you can push many tasks to the platform for automatic execution rather than being a burden for the application developer.

MapR-FS is also automatically controls and prioritizes I/O loads caused by system functions such as data mirroring, recovery from hardware failures or moving data to cold storage. Correct prioritization of such loads is tricky because prioritizing application I/O loads can actually degrade reliability if lost replicas of data are not re-created quickly enough. Conversely, prioritizing recovery loads can interfere with applications unnecessarily. MapR-FS handles this variable prioritization by controlling I/O loads on an end-to-end basis.

Geo-Distribution and a Global Data Fabric

You can connect MapR clusters together into a single data fabric by near real-time replication of tables and streams. This means that an application developer or data scientist can just read or write data in a stream or table as if it were local, even if it decidedly is not. A program can write data into a stream in one data center and an analytics program could read it in another, but each application would not need to know where the other is. Data could have come from an instrument in front of you or across the ocean—it feels the same. You don’t need to waste time at the application level making sure bytes move reliably; the platform takes care of this for you.

Figure 4-2 shows a highly simplified diagram of how a data fabric is built using the multimaster stream and table replication capabilities of the MapR platform. The figure shows two clusters within the data fabric. The first has two processes analyzing file, stream, and table data. The stream and table data from the first cluster are replicated to a second cluster where another program could analyze the data, as well.

The streams and tables in one MapR cluster can be replicated to streams and tables in other clusters to form a coherent data fabric. This replication allows multimaster updates. Either or both clusters could be in an on-premises data center or hosted in a public cloud.
Figure 4-2. The streams and tables in one MapR cluster can be replicated to streams and tables in other clusters to form a coherent data fabric. This replication allows multimaster updates. Either or both clusters could be in an on-premises data center or hosted in a public cloud.

Data can also be moved between clusters in your data fabric using volume mirroring, and mirroring to a remote cluster sets you up for disaster recovery. We describe the details of how mirroring and stream/table replication work later.

You can connect small footprint MapR clusters to the data fabric forming an Internet of Things (IoT) edge. This makes it feasible to collect data from many distributed sources and, if desirable, do processing, aggregation, or data modeling at the edge rather than having to move all data back to a central cluster. Edge Computing is one of the basic design patterns that we describe in Chapter 5.

Implications for Streaming

Streaming architecture and streaming microservices require decoupling of data sources and data consumers and thus are based on stream transport technology in the style of Apache Kafka, which includes the MapR streams. (See Chapter 2 and Figure 2-7 for an explanation). Although MapR streams support the open source Kafka API, they are implemented very differently, and that gives MapR streams additional capabilities beyond those of Kafka. Both are useful as connectors between microservices, but MapR’s stream replication, ability to handle a huge number of topics, and integration into the MapR file system set it apart from Kafka. Let’s see how these capabilities play out for production applications.

For systems based on streaming microservices, MapR streams are handy because each stream bundles an independent set of topics. This avoids inadvertent topic name collision between services, a potential problem with Kafka for which all topics are in a single flat namespace. The scoping of topic names within MapR streams makes it practical, for example, to run multiple rendezvous frameworks for managing different machine learning systems the same MapR cluster without interference even if the frameworks use the same topic names (in their own separate streams).

One of the biggest differences between MapR streams and Kafka is that MapR streams are built into the file system, and that has many implications. For one thing, with MapR, adding streams or adding topics to existing streams (even hundreds of thousands or millions) does not affect the effort to administer the cluster, but adding a new Kafka broker cluster does increase the effort required to administer Kafka. This characteristic of MapR reduces cluster sprawl compared to Kafka, which has difficulty efficiently handling more than about 1,000 topic partitions per broker in the Kafka cluster. For MapR, policies such as stream replication, message time-to-live (TTL) and access control expressions (ACEs) are applied at the stream level for many topics together. These capabilities are important for design patterns such as the IoT Data Web and other situations for which consistent control of data from many sources is desirable. Figure 4-3 illustrates an industrial IoT use case inspired by an oil and gas exploration company.

In this industrial IoT example, data from sensors on individual pumps are assigned to separate topics bundled together into a MapR stream, shown as a horizontal cylinder in the diagram. In reality, there could be thousands of topics handled by the stream or more. Here, the data for a digital twin is pulled from just one topic, pump-1. Another consumer, a dashboard, subscribes to topics pump-1 through pump-4.
Figure 4-3. In this industrial IoT example, data from sensors on individual pumps are assigned to separate topics bundled together into a MapR stream, shown as a horizontal cylinder in the diagram. In reality, there could be thousands of topics handled by the stream or more. Here, the data for a digital twin is pulled from just one topic, pump-1. Another consumer, a dashboard, subscribes to topics pump-1 through pump-4.

The way that MapR streams are embedded in the MapR file system also means that pieces of topic partitions are distributed across the cluster. With Kafka, partition replicas are restricted to a single machine. That limitation makes a practical difference. With MapR streams it is reasonable to set the TTL to months, years, or even forever, thus providing a long-term, replayable, event-by-event history. This playback capability is needed for use cases that employ the design pattern we call Streaming System of Record. We mentioned this approach in the use cases described in Figure 2-6, where a message stream with a long TTL might serve as the system of record for security analytics.

Keep in mind that MapR streams being a first-class part of the file system means that you don’t need a separate cluster just for stream transport as you do with Kafka. Every stream in a MapR cluster is the equivalent of an entire cluster of Kafka brokers. You can have as many streams on a MapR cluster as you like, each containing hundreds of thousands of topics or more. The more familiar a person is with Kafka, the harder it can be for them to realize that with MapR you don’t need a separate cluster for stream transport; you stream messages on the same cluster where your Flink, Spark, or other applications are hosted. This reduces operational complexity by avoiding a network bottleneck. This difference was highlighted in a discussion about benchmarking Apache Flink on MapR streams that appeared in the book Introduction to Apache Flink by Friedman and Tzoumas (O’Reilly, 2016; see Chapter 5).

A final implication of MapR streams being built in to the MapR file system is the role of the data platform working in conjunction with Kubernetes orchestration of containerized applications. Using the same platform across your entire data fabric, you can persist state from containerized applications in any form you like: as streams, files, or tables. This capability avoids the need for a swarm of containerized Kafka clusters, separate file stores, HDFS clusters, and databases.

Now you’ve seen how you can put to use some of the key capabilities of the MapR platform. In the rest of this chapter, we give an overview of the technology underlying these capabilities to show you how it works.

How This Works: Core MapR Technology

Internally, all data in MapR-FS is stored in database-like structures called B-trees that coordinate the storage of data arranged as 8-kiB blocks. Each tree lives on only a single machine, but each one can reference other trees on other machines. By choosing the key and the data in these trees, it’s possible to implement an entire file system complete with directories and files. A tree can be a directory if the keys in the tree are file names and the values are references to files. These files are also trees whose keys are offsets and values are references to chunks. Each chunk is a tree with offsets for keys, and values are references to actual pages of storage.

These trees can also emulate other interesting kinds of data storage. A tree can become a document database where keys are database keys, and values are encoded rows. Or they can implement a message queue where keys are combinations of topic and message offset.

This technical generality is what allows MapR-FS to contain directories, files, streams, and tables. All of these objects share their underlying implementation in terms of trees. There are a few additional objects that help manage the distributed data, but the core principle is the representation of everything in terms of these primitive B-trees.

Comparison with Hadoop

Some people equate big data with Hadoop, but the needs of big data have migrated significantly over the past few years. Practically speaking, the MapR data platform has several differences relative to Hadoop-based storage using HDFS that change the scope for applications that run on MapR.

These differences include:

  • HDFS has one or more special servers known as name nodes that hold all metadata for a cluster. Most commonly, a single name node is used, possibly with a backup (we talk about how name nodes can be federated later). The problem is that this name node is a bottleneck, a scalability limit, and a substantial reliability risk. Name node failure can (and we have heard of cases where it did) result in substantial data loss. In contrast, MapR has no name node; metadata is distributed across the cluster for higher reliability, higher performance, and less chance of bottlenecks.

  • MapR-FS supports real-time updates to files, tables, and streams. In contrast, real-time use of HDFS is difficult or impractical. This limitation in HDFS comes about because every write to a file extends the length of the file when the write is committed. This requires a round-trip transaction to the name node, so programs try to commit writes only rarely. In fact, it is common for multiple blocks composed of hundreds of megabytes of data to be flushed all at once.

  • HDFS supports only a write-once, read-many model in which files cannot be updated except by appending new data to the end, as soon as the file has been written.

  • Files, tables, and streams in the MapR platform have full multireader/multiwriter semantics. HDFS allows only single-writer or multiple readers, which means that readers cannot read files while a writer still has them open for writing. This makes real-time processing nearly impossible.

  • The content of files stored in MapR can be written in any order. This is required for many legacy systems. HDFS can’t do out-of-order writes. Network protocols like NFS often result in out of order writes, as do databases such as Postgres, MySQL, or Sybase.

  • The number of objects in a single MapR-FS cluster is practically unlimited; billions or trillions of files are viable. This makes the MapR data platform usable for many object storage applications, as mentioned in the design pattern called Object Store. With HDFS, the name node has to keep track of all of the blocks in all of the files in memory. That severely limits the number of blocks (and thus files) to a level several orders of magnitude too small for many applications.

  • Recent support for name node federation in HDFS does not change scalability limits in HDFS significantly because scaling with federation increases the level of manual administrative effort super linearly. The federation structure is also visible in the file system names, which causes the structure to leak into service implementations. These issues mean that federation increases scalability by only a small factor at best; supporting 10 billion files would require several hundred primary and backup name nodes.

The simplistic design of HDFS made it much easier to implement originally, but it also made it much harder to work with, except with programs specially designed to accommodate its limitations. Spark and Hadoop MapReduce are two examples of programming frameworks that support this, but almost all conventional databases, for example, cannot run on HDFS even in principle.

Of course, the limitations of HDFS were well understood from the beginning. The design was a reasonable one for the limited original use case. The problem really comes with attempts to use HDFS for different kinds of workloads or where the limits on file count or metadata update rates are important.

Beyond Files: Tables, Streams, Audits, and Object Tiering

As mentioned earlier, MapR-FS supports a number of capabilities not normally found in file systems, distributed or not. These capabilities allow a wide range of applications to be built on a single cluster without needing additional clusters for databases or message streams. These capabilities also allow you to connect multiple clusters together to form a data fabric.

MapR DB Tables

One such extended capability is the ability to store tables in directories anywhere that you can store a file. These tables are accessible using table operations analogously to the way that a file is accessible using file operations. Looked at one way, tables in MapR-FS are an extension of a file system beyond what you can do with files alone, but from a different point of view, these tables could be seen as an extension to the idea of databases to allow tables to live in directories, allowing them to have path names and permissions just like files. Regardless of the perspective you choose, you can access tables in MapR-FS using either the Apache HBase API or using the OJAI API, an open source document-oriented table interface that supports nested data formats such as JSON. The HBase API provides binary access to keys and values, whereas the document-oriented OJAI is more in the style of Mongo and is generally preferred for new applications because of the built-in nested data structure.

Records in MapR tables are associated with a key and are kept in order according to the key. Each table is split into tablets with each tablet containing data corresponding to a particular range of keys. Each tablet is further divided into smaller units called segments that actually do the real work. As data is inserted, these tablets can grow larger than a configurable size. When this is detected, tablets are split by copying segments to a new copy of the tablet. An effort is made to avoid network traffic during this copy but still leave the new tablets on different machines in the cluster. The underlying implementation for MapR tables inherently avoids long delays due to compactions. The result of this and other technical characteristics is much better availability and reliability than Apache HBase, especially under heavy load. You can find an example of the practical impact in the Aadhaar project mentioned in Chapter 1, where the system meets strict latency guarantees.

Tables in MapR can have records describing all changes to be written to a message stream using a changed data capture (CDC) feature. Because message streams in MapR-FS are well ordered, you can use this data to reconstruct a table, possibly in another technology (such as a search engine) or in masked or aggregated form. The use of CDC is described in Chapter 5 in the Table Transformation and Percolation design pattern.

You also can configure tables so that all changes are replicated to a remote copy of the table. Table replication is near real time and can use multiple network connections to allow very high replication rates. Within any single cluster, updates to tables are strongly consistent, but replication is bidirectional and subject to delay on network partition with conflicting updates on rows resolved using timestamps.

To support multimaster updates, it is common to devote individual columns or document elements to individual source clusters. You could use this, for example, to support multimaster increments of counters by having a single counter column per cluster in the data fabric. Reading a row gives an atomic view of the most recent value for the counters for each cluster. Adding these together gives a good, if slightly out-of-date, estimate of the total count for all clusters. Local strong consistency means that the column increments will be safe, and because each column comes from only a single cluster, replication is safe. Similar techniques are easily implemented for a variety of similar tasks such as IoT reporting.

Message Streams

In MapR-FS, message streams are first-class objects, as are files and tables. You can write to and read from message streams using the Apache Kafka API, although the implementation of message streams in MapR-FS shares no code with Kafka itself.

Each stream logically contains of a number of topics, themselves divided, Kafka-style, into partitions. Messages within partitions are ordered. Topics in different streams are, however, completely independent. This allows two instances of the same application to run independently using different streams, even though they might use the same topic names. To achieve the same level of isolation while avoiding the risk of topic name collision, two Kafka-based applications would have to use separate Kafka clusters.

Physically, MapR streams are implemented on top of the primitive B-tree structures in MapR by using a combination of topic, partition, and message offset as the key for a batch of messages. The batching of messages allows very high write and read throughput, whereas the unification of the entire stream in a single distributed data structure means that the same low-level mechanisms for distributing table and file operations can be repurposed for streams. This allows, for instance, all permission, security, encryption, and disaster recovery mechanisms to apply to streams with no special code required to make it so.

The MapR platform itself uses streams internally for a variety of purposes. The table CDC function uses streams to carry table updates. The audit system delivers audit messages using streams. System metrics that record operational volumes and latency summaries are also carried in streams.

You can replicate streams to remote clusters in a similar fashion as tables. The pattern of replication can be bidirectional, many-to-one, or even have loops. For consistency, you should write each topic in a set of stream replicants in only one cluster. Message offsets are preserved across all stream replicas so that messages can be read from different clusters interchangeably once they have arrived.

Auditing

In earlier chapters, we talked about the importance of having fine-grained control over data and applications and that requires knowing what is going on. Transparency into your systems helps with this in situations such as finding bottlenecks such as described at the end of Chapter 2.

With MapR, it is possible to turn on selective levels of audit information for any volumes in a system. With auditing enabled, information about object creation, deletion, update, or reads can be sent to a message stream in an easily parsed JSON format.

You can use audit information in a variety of ways, the most obvious being monitoring of data creation and access. This is very useful when combined with automated anomaly detection in which a predictive model examines access and creation of data for plausibility against historical patterns.

You can use audit information in other ways. For instance, by monitoring audit streams, a metadata extraction program can be notified about new files that must be analyzed or modified files whose metadata needs updating. You could imagine the creation of thumbnails for videos being triggered this way or the extraction of plain texts from PDF documents. Audit streams are also excellent ways to trigger indexing of files for search purposes.

Object Tiering

By default, all of the data in a MapR cluster is triplicated to provide resiliency against hardware failure and maximal performance. Replication applies to file, table, and stream data, as well as directory and volume metadata. It is common, however, that data is used intensively right after it is created, but usage drops off after that. Data that is rarely accessed may still need to be retained for a long time, however, possibly decades. Triplicating data in such cases gives no performance advantage but still costs space. It is important to differentially optimize how data is stored for speed, space, or cost.

Such quiescent data can be converted from triplicated form to a form known as erasure coding that takes up less than half the space but which has as good or better ability to survive multiple hardware failures. Conversion is controlled by administrative policies based on age and rate of access to data. As properties of volumes, these policies can vary from volume to volume.

Eventually, it may become desirable to absolutely minimize the cost of storing data to the point that speed of access to the data becomes an almost irrelevant consideration. In such cases, you can copy objects to a very low-cost object store such as Amazon Web Services’ Simple Storage Service (Amazon S3) or the Microsoft Azure object store.

In any case, the data will still be accessible using the same path name and same operations as ever, no matter how data is stored. Data can also be recalled to the speed tier at any time if it becomes important to optimize performance again. By preserving the metadata and providing uniform access methods, MapR clusters even allow updates to files, tables, and streams that have been copied to immutable storage like Amazon S3.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required