Structured streaming comes to Apache Spark 2.0

The O’Reilly Data Show Podcast: Michael Armbrust on enabling users to perform streaming analytics, without having to reason about streaming.

By Ben Lorica
May 19, 2016
Snow in Milan. Snow in Milan. (source: Giovanni Dall'Orto on Wikimedia Commons)

Structured streaming comes to Apache Spark 2.0
Data Show Podcast

00:00 / 00:44:44

With the release of Spark version 2.0, streaming starts becoming much more accessible to users. By adopting a continuous processing model (on an infinite table), the developers of Spark have enabled users of its SQL or DataFrame APIs to extend their analytic capabilities to unbounded streams.

Within the Spark community, Databricks Engineer, Michael Armbrust is well-known for having led the long-term project to move Spark’s interactive analytics engine from Shark to Spark SQL. (Full disclosure: I’m an advisor to Databricks.) Most recently he has turned his efforts to helping introduce a much simpler stream processing model to Spark Streaming (“structured streaming”).

Learn faster. Dig deeper. See farther.

Join the O'Reilly online learning platform. Get a free trial today and find answers on the fly, or master something new and useful.

Learn more

Tackling these problems at large-scale, in a popular framework with many, many production deployments is a challenging problem. So think of Spark 2.0 as the opening salvo. Just as it took a few versions before a majority of Spark users moved over to Spark SQL, I expect the new structured streaming framework to improve and mature over the next releases of Spark.

Here are some highlights from our conversation:

Replacing Shark with Spark SQL

I joked that when I joined Databricks my goal was to kill Shark. I mean, Shark was a great idea. I think it turns out that SQL is the language for a wide range of users. Having the ability to take SQL queries and execute them using the Apache Spark run time is an incredibly powerful way to accomplish a whole bunch of big data tasks. The problem with Shark was it was using too much of Hive. It was using Hive’s optimizer and Hive’s query planner, and then really just taking the execution and running on top of Spark. It was exactly the problems that I was seeing when I was at Google … It wasn’t understanding the Spark run time, and it was going to be very difficult for us to adapt it. This was a perfect place to try and apply some of those ideas of how to build an extensible optimizer. That is what became Catalyst.

… We dropped the first version of Spark SQL in Spark 1.0. At that point, it was experimental. It only did SQL, but we already had an idea that this was going to be a pretty exciting part of the project. … Another major turning point in the history of Spark SQL is Spark 1.3, when we added the DataFrame API. That was really the moment where we realized that not only was it going to replace Shark, but actually it was going to start to supplement RDDs.

… One of the things that I wasn’t expecting when I came to Databricks and started working on Spark is just how active the community is. Within I think two releases we were already up to a hundred people having contributed to just the SQL part … a velocity and excitement that I think is unprecedented in a lot of different projects.

Structured streaming: queries and analytics on tables that are growing

We wanted to preserve all the niceties and the high-level APIs that people like, but we wanted to really rethink what’s going on underneath the covers and make the optimization and the planning of how to actually accomplish the query, automatic through Catalyst and Tungsten. The right way to think about the Spark structured streaming API is, it’s just DataFrames. The line we’ve been using is: ‘The simplest way to perform streaming analytics is not having to reason about streaming.’ You use the exact same DataFrame or SQL API that you know and love, but now instead of applying it to batch data sources where all the data is there at the beginning, you apply it to data sources where data is going to be continuously arriving.

What this means is that when you come up with your query flow, instead of saying collect, execute a batch job, and return the answer. You say start, which actually begins what’s called the continuous query, which runs over and over again every time new data is available. Spark will automatically figure out how to incrementally process only what has arrived since the last time we did some processing, and then output the answer in an efficient way.

… Another part is, we actually built in some of the primitives that the original version of Spark Streaming would make you construct by hand. A really good example of this is event time aggregation. A super common use case: I have a stream of events coming in and I just want to count how many records per second are arriving. The problem is that data might arrive late. If data shows up five minutes late, I will attribute it to the wrong timestamp. What you want to do with event time is, instead of basing the time on when the data actually arrives in the processing system, you want to use the timestamp that’s actually recorded in the data itself.

You could do this with Spark Streaming but it was up to you as a user to actually maintain the hash table and update the counts from the data that arrived before. Unlike the way that we do it with structured streaming: you just say group by this time column, and do an aggregation. It automatically figures out how to do the incremental processing for you.

Guarantees for end-to-end pipelines

I think the hard part of streaming is the fact that it’s running all the time and you can’t mess up exactly-once semantics when there are failures. There will be failures if you’re running for a very long time. The problem when you don’t own the entire end-to-end pipeline is whichever parts that are not owned by Spark, you’re basically punting to the user, the problem of: how do I update this database in an idempotent way, how do I update it in a way that provides me isolation or whatever consistency of semantics you’re looking for. By having really tight integration with both the sources and the sinks for the stream, that means that Spark actually owns the fault-tolerance from the beginning to the end, and doesn’t have to punt those hard problems to the user.

Toward online learning and lower latency

We’ve been working very closely with the machine learning team to make sure that we have the write APIs in structured streaming to be able to do things like online training of a model. As data arrives, you apply it to this model and then with each timestamp, you just get the most up-to-date copy of the model. That’s one of the advanced use cases. … It does turn out that only certain types of algorithms can be adapted to be online.

… If you look at the kind of initial version of Spark Streaming, I think one of the biggest pain points was that batches were part of the API. If you wanted to do things like windowing, you actually had to reason about the batch interval relative to the windowing interval. The choices that you made there in terms of execution would actually affect the semantics of the answer that you are getting back. … If you look at the API for structured streaming, there’s nothing about batches in it — and that was a very conscious decision to leave that out. Basically, no matter what the execution strategy is, we can still execute your query correctly and give you the right answer with exactly-once semantics.

… Your code will return the correct answer no matter what’s going on underneath the covers, no matter what the micro batch interval is, or what’s actually happening in execution. What that means is two things: first of all, it means that some day we’ll have the freedom to switch the underlying execution engine to be tuple-at-a-time instead of batch execution. That’s one way to look at it, and I think actually this is something that Matei Zaharia is currently investigating with some of his students at MIT: what would that system look like, and how would you decide which execution engine to use? … What they are looking at is, when does it make sense to actually switch the execution engine underneath, in a way that’s invisible to the user, when does micro batching make sense, and when does a pipeline tuple-at-a-time processing makes sense.

… To give you an idea of kind of where I think certain features are going to drop: in Spark 2.0, it will be reading and writing from files, we will have the basics of event time aggregation. In the future, the things that we’re looking to do are native sessionization, integration with machine learning, and then really optimizing the performance in the latency, as well as the integration with all of the different systems that you want.

Editor’s note: Upcoming presentations on structured streaming include a Strata + Hadoop World London 2016 presentation by Tathagata Das and a Spark Summit presentation by Michael Armbrust (Spark Summit attendees use “OReilly16″ as a promo code for 20% off).

Related resources:

Post topics: AI & ML, Data, O'Reilly Data Show Podcast
Post tags: Podcast

Get the O’Reilly Radar Trends to Watch newsletter