O'Reilly logo

Learning Spark, 2nd Edition by Tathagata Das, Brooke Wenig, Denny Lee, Jules Damji

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

Chapter 4. Spark SQL and DataFrames Introduction to Built-in Data Sources

In the previous chapter, we explained the evolution and justification of structure in Spark. In particular, we discussed how the Spark SQL engine is the foundation on which the unification of high-level DataFrames and Datasets are built. Now, we continue with our discussion of DataFrames and explore its interoperability with Spark SQL.

This chapter and the next also explore how Spark SQL interfaces with some of the external components as shown in Figure 4-1. In particular, Spark SQL provides the following capabilities:

  • It provides an engine upon which high-level abstraction structured APIs such as DataFrames and Datasets are built (which we explored in chapter 3).

  • It can read and write data in a variety of structured formats (e.g., JSON, Hive Tables, Parquet, Avro, ORC, CSV, etc.)

  • It lets you query data using the JDBC/ODBC connectors from the external Business Intelligence data sources such as Tableau, Power BI, Talend or Relational Database Management Systems (RDBMS) such as MySQL, PostgreSQL, etc.

  • It provides a programmatic interface to interact with structured data stored as tables or views in a database from a Spark application.

  • It offers an interactive shell to issue SQL queries to your structured data.

  • It supports ANSI SQL:2003 commands1 2.

Spark SQL Usage and Interface
Figure 4-1. Spark SQL Usage and Interface

So let’s begin with how you can use Spark SQL in a Spark application.

Using Spark SQL in Spark Applications

Before Apache Spark 2.0, to use Spark SQL in your Spark application, you had to create a SparkSQLContext or a HiveContext instance to issue SQL queries. This was because the Spark 1.x series created distinct contexts to interact with distinct data sources. For example, to interact with a streaming source you needed a StreamingContext; to modify or access Spark configurations, you needed a SparkConf—all these contexts can be unruly to manage for a Spark developer.

With the introduction of SparkSession as part of the unification effort in Spark 2.0, a SparkSession instance subsumes all these contexts, including SparkSQLContext.3 You just import the SparkSession and create an instance in your code—and now you have access to all the contexts from this instance, without explicitly naming or accessing them.

//  In Scala
import org.apache.spark.sql.SparkSession

val spark = SparkSession
        .builder()
        .appName(“SparkSQLApp”)
        .getorCreate()

# In Python
from pyspark.sql import SparkSession
spark = (SparkSession
        .builder()
        .appName(“SparkSQLApp”)
        .getorCreate())

To issue any SQL query, use the sql() method on the SparkSession instance, spark. Assuming that you have an existing SQL table called us_delay_flights_tbl in your Hive metastore (more on that shortly), you can issue SQL queries against the table us_delay_flights_tbl using the spark.sql programmatic interface.

// In Scala
val df = spark.sql(“SELECT date, delay, distance, origin, destination FROM us_delay_flights_tbl”)
val df2= spark.sql(“SELECT * from us_delay_flights_tbl”””)

# In Python
df = spark.sql(“SELECT date, delay, distance, origin, destination FROM us_delay_flights_tbl”)
df2 = spark.sql(“SELECT * FROM us_delay_flights_tbl”)

All spark.sql queries in this manner return a DataFrame on which you may perform further Spark operations if you desire—the kind we explored in Chapter 3 and the ones you will learn in this chapter and the next.

Basic Query Example

Let’s walk through a few examples of queries on a data set of US flight delays with date, delay, distance, origin, and destination. 4 In our example, we will load a CSV file with over a million records. Using a schema for the CSV, we read data into a DataFrame and register the DataFrame as a temporary view (more on temporary views shortly) so we can query it with SQL.

For query examples, see all the code snippets in Examples 4-1 through 4-5, and for the entire example notebook in Python and Scala, see the code in the Learning Spark v2 GitHub5 repo. All these examples of SQL queries offer you a taste in how to use SQL in your Spark application using the spark.sql programmatic interface.6 Similar to the DataFrame API, this interface lets you query structured data in your Spark application.

Normally, in a standalone Spark application, you will have to create a SparkSession instance, as shown in the example below. However, in a Spark shell or Databricks notebook, the SparkSession is created for you, named appropriately as spark.

Example 4-3. Example 4-1. Creating SparkSession loading US departure flight data set and registering a temporary view in Scala
val csvFile = …         // path to your dataset; 
val spark = SparkSession
         .builder
         .appName("SparkSQLExampleApp")
.getOrCreate()
// define our schema as before
val schema = StructType(StructField("date", StringType, false),
         StructField("delay", IntegerType, false),
         StructField("distance", IntegerType, false),
StructField("origin", StringType, false),
StructField("destination", StringType, false))
// read and create a temporary view
spark.read.schema(schema).csv(csvFile).createOrReplaceTempView("us_delay_flights_tbl")
Example 4-3. Example 4-2. Creating SparkSession loading US departure flight data set and registering a temporary view in Python
     csvFile = …             # path to your dataset
       # create a SparkSession
       spark = (SparkSession
                .builder
                .appName("SparkSQLExampleApp")
                .getOrCreate())
      # define a schema
      schema = StructType([
           StructField("date", StringType(), False),
           StructField("delay", IntegerType(), False),
           StructField("distance", IntegerType(), False),
           StructField("origin", StringType(), False),
           StructField("destination", StringType(), False)])
    # read the data and create a temporary view spark.read.schema(schema).csv(csv_file).createOrReplaceTempView("us_delay_flights_tbl")
Note

Another way to define this schema is to use SQL DDL string. You may choose to use either. For example:

    // in Scala
    val schema = “date STRING, delay INT, distance INT, origin STRING”, destination STRING”
    # In Python
    schema = “`date` STRING, `delay`INT, `distance` INT, `origin` STRING, `destination` STRING”

 

 

Now that we have a temporary view, we can issue SQL queries using Spark SQL. These SQL queries are no different than those you may issue against a SQL table, say in MySQL or PostgreSQL database. The point here is to show that Spark SQL offers ANSI 2003 compliant SQL interface—and the interoperability between SQL and DataFrames.

So let’s have a go at some queries against our US departures flight data set mentioned above. The data set has five columns:

D ate

string e.g.,02190925, when converted maps to 02-19 09:25 am

D elay

in minutes from the scheduled time; a negative number denotes late arrival of the flight to the airport of origin, and positive denotes a late departure?

D istance

in miles from the origin airport to the destination airport

origin

origin IATA airport code

destination

destination IATA airport code

Example 4-4. Example 4-3. SQL Query-1: Find all flights whose distance between origin and destination is greater than 1000 miles in Scala or Python
spark.sql(“SELECT distance, origin, destination 
FROM us_delay_flights_tbl WHERE distance > 1000 
ORDER BY distance DESC”).show(10)

This query produces a result that looks as follows, showing the top 10 farthest in distance from Honolulu to New York.

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
+--------+------+-----------+
only showing top 10 rows
Example 4-6. Example 4-4. SQL Query-2: Find all flights with at least 2 hour delays between San Francisco (SFO) and Chicago (ORD) in Scala or Python
spark.sql("SELECT date, delay, origin, destination 
FROM us_delay_flights_tbl 
WHERE delay> 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' 
ORDER by delay DESC").show(10)

+--------+-----+------+-----------+
|date    |delay|origin|destination|
+--------+-----+------+-----------+
|02190925|1638 |SFO   |ORD        |
|01031755|396  |SFO   |ORD        |
|01022330|326  |SFO   |ORD        |
|01051205|320  |SFO   |ORD        |
|01190925|297  |SFO   |ORD        |
|02171115|296  |SFO   |ORD        |
|01071040|279  |SFO   |ORD        |
|01051550|274  |SFO   |ORD        |
|03120730|266  |SFO   |ORD        |
|01261104|258  |SFO   |ORD        |
+--------+-----+------+-----------+
only showing top 10 rows

It seems there were many significantly delayed flights between San Francisco and Chicago on different dates. (As an exercise, convert date column into readable format and ascertain what days or months were these delays? Were they related to winter months or holidays?)

Let’s try a more complicated query where we use the CASE clause in SQL. We want to label all US flights originating from airports with high, medium, low, no delays, regardless of destinations.

Example 4-7. Example 4-5. SQL Query-3: A more complicated query in SQL: let’s label all US flights with a human readable label: Very Long Delay (> 6 hours), Long Delay (2 - 6 hours), etc. in a new column called “Flight_Delays” in Scala or Python
spark.sql("SELECT delay, origin, destination,
              CASE
                  WHEN delay > 360 THEN 'Very Long Delays'
                  WHEN delay > 120 AND delay < 360 THEN  'Long Delays'
                  WHEN delay > 60 AND delay < 120 THEN  'Short Delays'
                  WHEN delay > 0 and delay < 60  THEN   'Tolerable  Delays'
                  WHEN delay = 0 THEN 'No Delays'
                  ELSE 'Early'
               END AS Flight_Delays
               FROM us_delay_flights_tbl
               ORDER BY origin, delay DESC").show(10)

+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|333  |ABE   |ATL        |Long Delays  |
|305  |ABE   |ATL        |Long Delays  |
|275  |ABE   |ATL        |Long Delays  |
|257  |ABE   |ATL        |Long Delays  |
|247  |ABE   |DTW        |Long Delays  |
|247  |ABE   |ATL        |Long Delays  |
|219  |ABE   |ORD        |Long Delays  |
|211  |ABE   |ATL        |Long Delays  |
|197  |ABE   |DTW        |Long Delays  |
|192  |ABE   |ORD        |Long Delays  |
+-----+------+-----------+-------------+
only showing top 10 rows

As with DataFrames and Datasets APIs so with the spark.sql interface, you can conduct common data analysis we explored in the previous chapter; both the DataFrame as well as the spark.sql query computation undergo an identical journey in the Spark SQL engine (see Figure 3-2 in chapter 3), giving you the same results.

Note

All the three SQL queries above can be expressed with a DataFrame API equivalent query. For example, Query 1 can be expressed in the Python DataFrame API as:

from pyspark.sql.functions import col, desc

(df.select("distance", "origin", "destination")
.where(col("distance") > 1000)
.orderBy(desc("distance")).show())

Or

(df.select("distance", "origin", "destination")
   .where("distance > 1000")
   .orderBy("distance", ascending=False).show(10)

As an exercise, convert the other SQL code from above to use the DataFrame API.

Also, note the similarity and readability between DataFrame API query and its equivalent SQL one. Further, both SQL and DataFrame queries compute to the same results.

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
+--------+------+-----------+
only showing top 10 rows

As you can see from the above queries, using the Spark SQL interface to query data feels like you are writing a regular SQL query to a relational database table. Although the queries are in SQL, you can feel the similarity in readability and semantics to DataFrames API operations, which you encountered in chapter 3—and will explore more in the next chapter.

For you to be able to query structured data, as shown in the above examples, Spark manages all the complexities of creating and managing views and tables, both in memory and on disk, which leads us to discuss how tables and views are created and managed.

SQL Tables and Views

Tables hold data. Associated with each table in Spark is its relevant metadata, which is information about a table and data, such as schema, description, table name, database name, column names, partitions, the physical location where the actual data resides, etc. All this is stored in a central metastore.

Instead of having a separate metastore for Spark tables, Spark uses Apache Hive metastore as its metastore located under the Hive warehouse default location: /user/hive/warehouse. Hive uses this location to persist all metadata about your table (e.g., schema, table name, columns, description, etc.). However, you may change the default location by setting the Spark config spark.sql.warehouse.dir to another location.

Unmanaged vs Managed Tables

Spark allows you to create two types of tables: unmanaged and managed. For managed tables, Spark manages both the metadata and data on the file store. This could be local file system, HDFS, or an object store such as Amazon S3 or Azure Blob.

Because Spark manages the table, a SQL command such as DROP table_name SQL deletes both the metadata and data. Whereas in an unmanaged table, the same command will delete only the metadata, not the actual data. For an unmanaged table, Spark only manages the metadata, while you can manage the data in an external data source such as Cassandra, JDBC or other.7 We will look at some examples of how to create managed and unmanaged tables below.

Creating SQL Databases and Tables

Tables reside within a Database. By default, Spark creates tables under a default database. To create your own Database name, you can issue a SQL command from your Spark application or notebook. Using the US delay flights dataset above, let’s create both a managed and unmanaged table.

spark.sql(“CREATE DATABASE learn_spark_db”)
spark.sql(“USE learn_spark_db”)

Any subsequent SQL commands for creating tables for the duration of a Spark application will reside under the database name learn_spark_db.

Creating a Managed Table

To create a managed table within the database learn_spark_db, you can use one of the two APIs from your Spark application:

For example, in SQL use:

spark.sql(“CREATE TABLE us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING”)

In Python DataFrame API use:

# path to our US flight delay CSV file 
csv_file = . . .
# schema as defined in the above example
schema = “date STRING, delay INT, distance INT, origin STRING, destination STRING”
flights_df = spark.read.csv(csv_file, schema=schema)
flights_df.saveAsTable(“us_delay_flights_tbl”)

Either the Python or SQL statement above will create the managed table us_delay_flights_tbl under the database name learn_spark_db.

Creating an Unmanaged Table

By contrast, you can create an unmanaged table from your own data sources, say Parquet, CSV or JSON files stored in a file store accessible to your Spark application.

To create an unmanaged table from a data source such as JSON file, in SQL use:

spark.sql(“CREATE TABLE us_delay_flights_tbl(date STRING, delay INT, 
  distance INT, origin STRING, destination STRING) 
  USING json OPTIONS (PATH ‘path_to_us_flights_json_file’)”)

# From SQL 
CREATE TABLE us_delay_flights_tbl(date STRING, delay INT, 
distance INT, origin STRING, destination STRING) 
USING json OPTIONS (path "path_to_us_flights_json_file")

And within the DataFrame API use:

dataframe.write.option('path',"/tmp/data/us_flights_delay").saveAsTable("us_delay_flights_tbl")

For you to explore these examples, we have created a Python and Scala example notebook called spark_tables.py and SparkTables.scala in the Learning Spark v2 GitHub8 repo; you can run the corresponding notebooks.

Warning

When you delete an unmanaged table from a database, you are only deleting the metadata associated with it; the actual data still resides in the path specified during table creation. You will have to explicitly delete it.

Creating Views

In addition to creating tables, Spark can create views on top of existing tables. Views can be global (visible across all SparkSessions on a given cluster) or temporary (visible only to a single SparkSession), and they disappear after your Spark application or notebook terminates.

Like creating tables within a database, creating views has a similar syntax.9 Once you create a view, you can query it as you would a table. The difference between a view and table is that views allow you organize data with different logical lens or slices of the same table. It’s like creating multiple DataFrames with different transformational queries from the original DataFrame.

You can also create a view from an existing Table using SQL. For example, if you wish to work only on a subset of US flights with origin airports in New York (JFK) and San Francisco (SFO), the following queries will create a slice of the table as a view.

In SQL:

CREATE OR REPLACE TEMP VIEW us_origin_airport_SFO_global_tmp_view AS
  SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE origin = ‘SFO’

CREATE OR REPLACE GLOBAL TEMP VIEW us_origin_airport_JFK_tmp_view AS
  SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE origin = ‘JFK’

Alternatively, for a given a DataFrame, you can call its two methods to create temporary or global temporary views (we will discuss the difference between them shortly):

# In Python
df_sfo =  spark.sql(“SELECT date, delay, origin, destination  from us_delay_flights_tbl WHERE origin = ‘SFO’”)
df_jfk = spark.sql(“SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE origin = ‘JFK’”)

# create a temporary and global temporary view
df_sfo.createOrReplaceGlobalTempView(“us_origin_airport_SFO_global_tmp_view”)
df_jfk.createOrReplaceTempView(“us_origin_airport_JFK_tmp_view”)

Let’s issue queries against both the temporary and global view. Keep in mind that, while accessing a global temporary view, you must use the prefix “global_temp.<view_or_table_name” because Spark creates global temporary view in a global temporary database called global_temp.

Once created, you can issue queries against these temporary views as you would against tables–and drop a view as you would a table.

In SQL use: 
SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view

By contrast, you can access the normal temporary view without the global_temp prefix.

In SQL use: 
SELECT * FROM us_origin_airport_JFK_tmp_view

And to drop the view or table, simply use:

In In SQL: :

DROP VIEW IF EXISTS us_origin_airport_SFO_global_tmp_view
DROP VIEW IF EXISTS us_origin_airport_JFK_tmp_view

Temporary Views vs Global Views

A source of mild confusion among new developers to Spark, the difference is subtle. A temporary view is tied to a single SparkSession within a Spark application. In contrast, a global view is visible across multiple SparkSessions within a Spark application. Yes, you can create multiple SparkSessions within a single Spark application—for some use cases, where you want to access data (and combine them) from two different SparkSessions that don’t share the same Hive metastore configurations.10

Occasionally, you may want to explore data in the metadata about tables and views, managed by Spark and captured in the Catalog. In Spark 2.x, the Catalog has expanded its functionality with new public methods to explore metadata.

Using SparkSession to Peruse Catalog

As mentioned above, Spark manages the metadata associated with each managed or unmanaged table. This is captured in the Catalog—a high-level abstraction in Spark SQL for storage of all metadata. This Catalog is accessible via programmatic interfaces.11 12

For example, within a Spark application, after creating the SparkSession variable spark, you can access all metadata, such as databases, tables, columns, registered UDFs (user defined functions), locations, schema etc., through many of its public methods.

spark.catalog.listDatabases()
spark.catalog.listTables()
spark.catalog.listColumns(“us_delay_flights_tbl”)

Try importing the notebook from Github repo Learning Spark 2ed 13.

The spark.sql.Catalog API offers you the ability to peruse all metadata associated with your database and respective tables or views within the database. Not only can you create databases, tables, and views but can also inquire whether a table is cached or not.

Caching SQL Tables

Although we will discuss table caching strategies in the next chapter, it’s worth mentioning that like DataFrames, you can cache or uncache SQL tables and views too.

In SQL:

CACHE TABLE <table-name>
UNCACHE TABLE <table-name>

Reading Tables into DataFrames

Often, data engineers build data pipelines as part of their regular data ingestion and extract, transform, and load (ETL) process. They populate Spark SQL databases and tables with cleansed data for consumption by applications downstream.

Let’s assume an existing database learn_spark_db and a table us_delay_flights_tbl is ready for use. Instead of you reading from an external JSON file, you can simply use SQL to query the table and assign the returned result to a DataFrame.

// In Scala:
spark.sql(“USE learn_spark_db”)
val usFlightsDF =  spark.sql("SELECT * FROM us_delay_flights_tbl")
val usFlightsDF2 = spark.table(“us_delay_flights_tbl”)

# In Python
spark.sql(“USE learn_spark_db”)
us_flights_df = spark.sql("SELECT * FROM us_delay_flights_tbl")
us_flights_df2 = spark.table(“us_delay_flights_tbl”)

Now you have a cleansed DataFrame read from an existing Spark SQL database and table, created by a data engineer in your team. Furthermore, you can also read data from other file formats using the Spark built-in data sources, giving you the flexibility to interact with various common file formats.

Data Sources for DataFrames and SQL Tables

As shown in Figure 4-1, Spark SQL provides an interface to a variety of data sources. As a primary interface to data sources, Spark provides built-in data sources and a set of common methods for reading and writing data to and from these data sources using the Data Source API.14 15 In Spark 2.x, Data Source API v2 introduced additional built-in data sources with high reliability and fault-tolerance.

In this section, we will cover some of these built-in data sources, file formats, and ways to load and write data, along with any specific options pertaining to these data sources to interact with.

But first let’s understand two high-level Data Source APIs constructs and its format that dictate the manner in which you interact with the respective data source: DataFrameReader and DataFrameWriter.

DataFrameReader

DataFrameReader is the core construct for reading data from a data source into a DataFrame. It has a defined format and a recommended pattern for usage; this pattern of stringing methods together is a common usage in Spark. It also makes it easier to read. We have seen this pattern in Chapter 3 while exploring common data analysis patterns.

DataFrameReader.format(args).option(“key”, “value”).schema(args).load()

You can only access DataFrameReader through a SparkSession instance16 17. That is, you cannot create an instance of DataFrameReader. To get an instance handle to it use:

SparkSession.read or SparkSession.readStream

While read returns a handle to DataStreamReader to read into a DataFrame from a static data source, readStream returns an instance to read from a streaming source. (We will cover Structured Streaming later in the book.)

Arguments to each of the methods to DataFrameReader take different values, and we enumerate each in the table below with a subset of the supported arguments.

Table 4-1. Table 4-1. DataFrameReader methods, arguments, and options
Method Arguments Description
format()
“parquet”, “csv”, “txt”, “json”, “jdbc”, “orc”, “avro”, etc If you don’t specify this method, then the default is Parquet or whatever is set in spark.sql.sources.default
option()
  • (“mode”, {PERMISSIVE | FAILFAST | DROPMALFORMED } )
  • (“inferSchema”, {true | false}
  • (“path”, “path_file_data_source”)
A series of key/value pairs and options.a b
The documentation shows some examples.
schema()
DDL String or StructType, e.g., ‘A INT, B STRING’ or StructType(...)
For JSON or CSV format, you specify to infer the schema in the option() or supply a schema, which makes loading faster and ensures your data conforms to the expected schema.
load()
“/path/to/data/source” This is the path to the data source or can be empty if specified in the option(“path”, “...”)

a https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader

b https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader

While we don’t expect to comprehensively enumerate all different combinations of arguments and options, the documentation for Python, Scala, R, and Java offer suggestions and guidance18. But it’s worthwhile to show a couple of examples:

// In Scala
// Use Parquet 
val file  = “/databricks-datasets/learning-spark-v2/data/flight-data/parquet/2010-summary.parquet”
 val df = spark.read.format(“parquet”)
.option(“path”, file)
.load() 
// Use the default format() value in the configuration
 val df2 = spark.read.option(“path”, file).load()
// Use CSV
val df3 = spark.read.format(“csv”)
    .option(“inferSchema”, “true”)
    .option(“header”, “true”)
    .option(“mode”, “PERMISSIVE”)
    .load(“/databricks-datasets/learning-spark-v2/data/flight-data/csv/*”)
// Use JSON
val df4 = spark.read.format(“json”)
        .option(“path”, “/databricks-datasets/learning-spark-v2/data/flight-data/json/*”)
        .load()
Note

In general, no schema is needed when reading from a Parquet static data source since the Parquet metadata usually contains the schema. However, for streaming data source, you will have to provide a schema. (We will cover reading from streaming data sources in chapter 9.) Parquet is also the default and preferred data source for Spark because it’s efficient, uses columnar storage, and employs faster compression algorithm. You will see additional benefits later (such as columnar pushdown) when we cover the Catalyst optimizer in greater depth.

Furthermore, you can omit format(“parquet”) as it will resort to Parquet, Spark’s default data source.

DataFrameWriter

Like its counterpart, DataFrameWriter does the reverse: it saves or writes data to a specified built-in data source. Unlike DataFrameReader, you access its instance not from a SparkSession but from the DataFrame you wish to save.19 20

DataFrameWriter.format(args).option(args).bucketBy(args).partitionBy(args).save()
DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable()

DataFrame.write or DataFrame.writeStream

Arguments to each of the methods to DataFrameWriter also take different values, and we enumerate each in the table below with a subset of the supported arguments:

Table 4-2. Table 4-2. DataFrameWriter methods, arguments, and options
Method Arguments Description
format()
“parquet”, “csv”, “txt”, “json”, “jdbc”, “orc”, “avro”, etc. If you don’t specify this method, then the default is Parquet or whatever is set in spark.sql.sources.default
option()
  • (“mode”, {append | overwrite| ignore | error} )
  • (“path”, “path_to_write_to”)
A series of key/value pairs and options.
The documentation shows some examples.a b
bucketBy()
(numBuckets, col, col…, coln) Number of buckets and names of columns to bucket by. Uses Hive’s bucketing scheme on a file system.
save() saveAsTableI()
“/path/to/data/source”
“table_name”
  • Save to path or can be empty if specified in the option(“path”, “...”)
  • Save to the table name,

a https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

b https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

A couple of short examples to illustrate the use of methods and arguments:

// In scala
// Use JSON
val location = …. 
df.write
  .format(“json”)
  .mode(“overwrite”)
  .sortBy(“lines”)
  .save(location)

Parquet

Let’s start with Parquet since it’s the default data source in Spark. Supported and widely used by many big data processing frameworks and platforms, Parquet is an open-source, columnar file format that offers many I/O optimizations such as compression and columnar format, which saves storage space and allows for quick access to data columns. 21

Because of its efficiency and I/O optimizations, we recommend that after you have transformed and cleansed your data, save your DataFrames in the Parquet format for downstream consumption. (Parquet is also the default table open format for Delta Lake, which we will cover in a later chapter.)

Reading Parquet File into DataFrame

Parquet files are stored in a directory structure, where it contains the data files, metadata, a number of compressed files, and some status files. For example, a set of Parquet files in this directory structure include _SUCCESS, _committed_3326188247966943927, _started_3326188247966943927, and part-00000-tid-3326188247966943927-52184c28-6400-4996-ba5a-5a91fc99ddac-18-1-c000.snappy.parquet—there may be a number of part-XXXX compressed files in a directory.22

To read these Parquet files, simply specify the format and path; no need to supply the schema since Parquet saves it as part of its metadata, unless you are reading from a streaming data source.

// In Scala
val file = “/databricks-datasets/learing-spark-v2/data/flight-data/parquet/2010-summary.parquet”
val df = spark.read.format(“parquet”).load(file)

# In Python
file = “...”
df = spark.read.format(“parquet”).load(file)

Reading Parquet File into Spark SQL Table

Aside from reading Parquet files directly into a Spark DataFrame, you can also create a Spark SQL unmanaged table or view directly using SQL.

// In SQL
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING parquet
    OPTIONS (
      path "/databricks-datasets/learing-spark-v2/data/flight-data/parquet/2010-summary.parquet" )

Once a table or view is created, you can read data into a DataFrame using SQL as we have seen in some examples above:

// In Scala/Python
spark.sql(“SELECT * FROM us_delay_flights_tbl”).show()
# In Python
spark.sql(“SELECT * FROM us_delay_flights_tbl”).show()

Both operations above return the same results as the DataFrame API:

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

Writing DataFrames to Parquet Files

Writing or saving a DataFrame as a table or file is a common operation in Spark. To write a DataFrame simply use the methods and arguments to the DataFrameWriter, supplying the location to save the Parquet files. For example:

// In Scala
df.write.format(“parquet”)
.mode(“overwrite”)
.option(“path”, “/tmp/data/parquet/df_parquet”)
.option(“compression”, “snappy”)
.save()
Under the location path, this will create a set of compact and compressed Parquet files. Since we used snappy as our compression choice, we have snappy compressed files. For brevity, we have only a couple of files; normally, there may be a dozen files created.

-rw-r--r--  1 jules  wheel    0 May 19 10:58 _SUCCESS
-rw-r--r--  1 jules  wheel  966 May 19 10:58 part-00000-bf73a6d3-ee4d-4087-ba4d-131e73ffddd4-c000.snappy.parquet

Writing DataFrame to Spark SQL Table

Writing a DataFrame to a SQL table is as easy as writing to a file, with only one line change: saveAsTable(...). This will create a managed table us_delay_flights_tbl.

// In Scala/Python
df.write
  .mode(“overwrite”)
  .saveAsTable(“us_delay_flights_tbl”)

To reprise, Parquet is a preferred and default built-in data source file format in Spark. It was adopted by many other frameworks. In your ETL processes or data ingestion, we do recommend to save your DataFrames in Parquet format.

JSON

JavaScript Object Notation (JSON) is also a popular data format. It came to prominence as an easy-to-read-and-parse format compared to XML. It has two representational formats: single-line delimited mode and multi-line format mode. 23 Both modes are supported in Spark.

In single line-delimited mode, each line denotes a single JSON object, whereas in a multi-line delimited mode, the entire multi-line object constitutes a single JSON object. To read in this mode set multiLine to true in the option() method.

Reading JSON File into DataFrames

Let’s read a JSON file into our DataFrame as we did with Parquet, except that we use the JSON format in the format method.

// In Scala

val file = “/databricks-datasets/learning-spark-v2/data/flight-data/json/*
val df = spark
    .read
    .format(“json”)
    .option(“path”, file)
    .load()

# In Python
file = “/databricks-datasets/learning-spark-v2/data/flight-data/json/*”
df  = (spark
    .read
    .format(“json”)
    .option(“path”, file)
.load())

Reading JSON file into Spark SQL Table

As with Parquet files, you can create a SQL table from a JSON file

// In SQL

CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING json
    OPTIONS (
      path  “/databricks-datasets/learning-spark-v2/data/flight-data/json/*”
    )

Once a table is created, you can read data into a DataFrame using SQL as we have seen in some examples above:

// In Scala/Python
spark.sql(“SELECT * FROM us_delay_flights_tbl”).show()


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
|United States    |Singapore          |1    |
|United States    |Grenada            |62   |
|Costa Rica       |United States      |588  |
|Senegal          |United States      |40   |
|Moldova          |United States      |1    |
+-----------------+-------------------+-----+
only showing top 10 rows

Writing DataFrame to JSON Files

Saving a DataFrame as a JSON file is simple: specify methods and arguments to your DataFrameWriter, supplying the location to save JSON files. This will create compact JSON files under the location.

// In Scala
df.write.format(“json”)
.mode(“overwrite”)
.options(“path”, “/tmp/data/json/df_json”)
.option(“compression”, “snappy”)
.save()

-rw-r--r--  1 jules  wheel   0 May 16 14:44 _SUCCESS
-rw-r--r--  1 jules  wheel  71 May 16 14:44 part-00000-5149e5b3-fe04-410d-bca6-f92a3cbc81ec-c000.json

JSON Data Source Option

Table 4-3 describes some of the common options for DataFrameReader and DataFrameWriter. For a comprehensive option list, we refer to the programming guide, documentation, and other resources here. 24 25 26

Table 4-3. Table 4-3 Table of the options for DataFrameReader and DataFrameWriter
Property Name Values Meaning Scope
Compression or codec None, uncompressed, bzip2, deflate, gzip, lz4, or snappy Use these compression codec for reading or writing read/write
dateFormat YYYY-mm-dd or SimpleDataFormat Use this format or any format from Java’s SimpleDateFormat read/write
multiline true, false Use multiline mode. Default is single-line mode read
allowsUnquotedFileNames true,false The default is false. Can have unquoted JSON field names read

CSV

As widely used as a plain text file, this common text file format captures each datum or field delimited by a comma; each line with comma separated fields represents a record. Even though a comma is the default separator, you may use other delimiters to separate fields, in cases where a comma is part of your data. For example, a series of temperatures for a day could be separated by a comma. Popular spreadsheets can generate CSV files, thus its common usage among data and business analysts.

Reading CSV File into DataFrame

As with other built-in data source, you can use the DataFrameReader methods and arguments to read a CSV file into a DataFrame.

// In Scala
val file = “/databricks-datasets/learning-spark-v2/data/flight-data/csv/*”
val schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"

val df = spark
    .read
    .format(“csv”)
    .schema(schema)
    .option(“header”, “true”)
    .option(“mode”, “FAILFAST”)     // exit if any errors
    .option(“nullValue”, “”)        // replace any null data field with “”
    .option(“path”, file)
    .load()

# In Python
file = “/databricks-datasets/learning-spark-v2/data/flight-data/csv/*”
schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"
df  = (spark
    .read
    .format("csv")
    .option("header", "true")
    .schema(schema)
    .option("mode", "FAILFAST")      # exit if any errors
    .option("nullValue", "")      # replace any null data field with “”
    .option("path", file)
    .load())

Reading CSV File into Spark SQL Table

This is not different from Parquet or JSON when creating SQL tables using a CSV data source.

// In SQL
CREATE or REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING csv
    OPTIONS (
      path "/databricks-datasets/LearingSpark-v2/data/flight-data/csv/*",
    header “true”,
      inferSchema “true”,
    mode “FAILFAST”
    )

Once a table is created, you can read data into a DataFrame using SQL as we have seen in some examples above:

// In Scala
spark.sql(“SELECT * FROM us_delay_flights_tbl”).show()
# In Python
df = spark.sql(“SELECT * FROM us_delay_flights_tbl”).show()
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

Writing DataFrame to CSV File

Saving a DataFrame as CSV file is simple: specify methods and arguments to your DataFrameWriter, supplying the location to save CSV files.

// In Scala

df.write.format(“csv”)
.mode(“overwrite”)
.options(“path”, “/tmp/data/csv/df_csv”)
.save()

This generates a folder under the specified location, populating it with a bunch of smaller compressed and compact files under the “/tmp/data/csv/df_csv” folder:

-rw-r--r--  1 jules  wheel   0 May 16 12:17 _SUCCESS
-rw-r--r--  1 jules  wheel  36 May 16 12:17 part-00000-251690eb-682c-4a6d-8c42-d6abfa663125-c000.csv

CSV Data Source Option

Table 4-4 describes some of the common options for DataFrameReader and DataFrameWriter. Because CSV files can be complex, there is a comprehensive list of options documented. However, we provide some common options in this table. 27 28 29.

Table 4-4. Table 4-4. Describes some of the options for DataFrameReader and DataFrameWriter
Property Name Values Meaning Scope
Compression or codec None, uncompressed, bzip2, deflate, gzip, lz4, or snappy Use these compression codec for reading or writing read/write
dateFormat YYYY-mm-dd or SimpleDataFormat Use this format or any format from Java’s SimpleDateFormat read/write
multiLine true, false Use multiline mode. Default is single-line mode read
inferSchema true,false Default is false. If true, Spark will determine the column data types read
sep Any character Default delimiter is a comma (“,”) read/write
escape Any character Default is “\”. Use this character to escape characters while reading. read
header true,false Default is false. Indicated whether the first line is a header denoting each column name. read/write

Avro

Introduced in Spark 2.4 as a built-in data source, Avro format is used by Apache Kafka for message serializing and deserializing for many reasons: direct mapping to JSON; efficient and fast; and bindings available for many programming languages.30 31

Reading Avro File into DataFrame

Reading an Avro file using DataFrameReader is consistent in usage with other data sources we have discussed in this section.32

// In Scala
val df = spark.read
.format(“avro”)
.option(“path”, “/tmp/data/avro/episodes.avro”)
.load()

df.show(false)

# In Python
df = (spark.read
.format(“avro”)
.option(“path”, “/tmp/data/avro/episodes.avro”)
.load())

 df.show(False)
+-----------------------+----------------+------+
|title                  |air_date        |doctor|
+-----------------------+----------------+------+
|The Eleventh Hour      |3 April 2010    |11    |
|The Doctor's Wife      |14 May 2011     |11    |
|Horror of Fang Rock    |3 September 1977|4     |
|An Unearthly Child     |23 November 1963|1     |
|The Mysterious Planet  |6 September 1986|6     |
|Rose                   |26 March 2005   |9     |
|The Power of the Daleks|5 November 1966 |2     |
|Castrolava             |4 January 1982  |5     |
+-----------------------+----------------+------+

Reading Avro File into Spark SQL Table

Again, this is no different from Parquet, JSON or CSV when creating SQL tables using an Avro data source.

CREATE or REPLACE TEMPORARY VIEW episode_tbl
    USING avro
    OPTIONS (
      path "/tmp/data/avro/episodes.avro”
    )

Once a view is created, you can read data into a DataFrame using SQL as we have seen in some examples above:

// In Scala
spark.sql(“SELECT * FROM episode_tbl”).show(false)
// In Python
spark.sql(“SELECT * FROM episode_tbl”).show(False)

+-----------------------+----------------+------+
|title                  |air_date        |doctor|
+-----------------------+----------------+------+
|The Eleventh Hour      |3 April 2010    |11    |
|The Doctor's Wife      |14 May 2011     |11    |
|Horror of Fang Rock    |3 September 1977|4     |
|An Unearthly Child     |23 November 1963|1     |
|The Mysterious Planet  |6 September 1986|6     |
|Rose                   |26 March 2005   |9     |
|The Power of the Daleks|5 November 1966 |2     |
|Castrolava             |4 January 1982  |5     |
+-----------------------+----------------+------+

Writing DataFrame to Avro File

Writing a DataFrame as Avro file is simple: specify methods and arguments to your DataFrameWriter, supplying the location to save Avro files to.

// In Scala
df.write
.format("avro")
.mode("overwrite")
.option("path", "/tmp/data/episodes_avro")
.save()

This generates a folder under the specified location, populating with a bunch of smaller compressed and compact files under the “/tmp/data/csv/episodes.avro” folder:

-rw-r--r--  1 jules  wheel    0 May 17 11:54 _SUCCESS
-rw-r--r--  1 jules  wheel  526 May 17 11:54 part-00000-ffdf70f4-025f-4ec1-b25e-4e2fbade0312-c000.avro

Avro Data Source Options

Table 4-5 describes some of the common options for DataFrameReader and DataFrameWriter. A comprehensive list of options is in the documentation.33

Property Name Default Meaning Scope
avroSchema None Optional Avro schema provided by a user in JSON format. The date type and naming of record fields should match the input Avro data or Catalyst data [Spark internal data type], otherwise the read/write action will fail. read/write
recordName topLevelRecord Top level record name in write result, which is required in Avro spec. read/write
recordNamespace “” Record namespace in write result. write
ignoreExtension true The option controls ignoring of files without .avro extensions in read.
If the option is enabled, all files (with and without .avro extension) are loaded.
read
compression snappy The compression option allows to specify a compression codec used in writing.
Currently supported codecs are uncompressed, snappy, deflate, bzip2 and xz.
If the option is not set, the configuration spark.sql.avro.compression.codec config is taken into account.
write

ORC

As an additional optimized columnar file format, Spark 2.x supports vectorized ORC reader. Two Spark configurations dictate which ORC implementation to use. When Spark configurations spark.sql.orc.impl is set to native and spark.sql.orc.enableVectorizedReader is set to true, Spark uses the vectorized ORC reader. A vectorized reader reads blocks of rows (often 1024 per block) instead of one row at a time, streamlining operations and reducing CPU usage for intensive operations like scans, filters, aggregations, and joins.34

For creating Hive ORC serde tables, especially in the SQL command USING HIVE options, vectorized reader is used when the Spark configuration spark.sql.hive.convertMetastoreOrc is set to true. Serde is serialization and deserialization FileFormat used by Hive tables. 35 36

Reading ORC File into DataFrame

To read a DataFrame using the ORC vectorized reader, you can just use DataFrameReader with the normal methods and options as:

// In Scala: 
val file = “/databricks-datasets/learing-spark-v2/data/flight-data/orc/*.*
val df = spark.read
.format(“orc”)
.option(“path”, file)
.load()

df.show(10, false)

# In Python
file  = “/databricks-datasets/learning-spark-v2/data/flight-data/orc/*.*”
df = (spark.read
.format(“orc”)
.option(“path”, file)
.load())

 df.show(10, False)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

Reading ORC File into Spark SQL Table

There is no difference from Parquet, JSON, CSV, or Avro when creating SQL view using an ORC data source.

// In SQL
CREATE or REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING orc
    OPTIONS (
      path “/databricks-datasets/learning-spark-v2/data/flight-data/orc/*.*”
    )

Once a table is created, you can read data into a DataFrame using SQL as we have seen in some examples above:

// In Scala/Python
spark.sql(“SELECT * FROM us_delay_flights_tbl”).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

Writing DataFrame to ORC File

Writing back a transformed DataFrame after reading is equally simple using the DataFrameWriter methods.

// In Scala
df.write.format(“orc”)
.mode(“overwrite”)
.option(“path”, “/tmp/data/orc/df_orc”)
.option(“compression”, “snappy”)
.save()

The result will be a folder under the location path with some compressed ORC files.

-rw-r--r--  1 jules  wheel    0 May 16 17:23 _SUCCESS
-rw-r--r--  1 jules  wheel  547 May 16 17:23 part-00000-bd41c3ec-38d8-4263-af46-6a0b3150c5ab-c000.snappy.orc

Image

In Spark 2.x, the community introduced a new Data Source, image files, to support deep learning and machine learning frameworks such as TensorFlow, PyTorch, and Deep Learning Pipelines as first-class citizens of Apache Spark.37 38 For computer vision machine learning applications, loading and processing image datasets is important.

Reading Images File into DataFrame

As with all above file formats, you can use the DataFrameReader’s methods and options as:

In Scala:

import org.apache.spark.ml.source.image

val imageDir = “/databricks-datasets/cctvVideos/train_images/”
val imagesDF = spark.read.format(“image”).load(image_dir)

imagesDF.printSchema
imagesDF.select("image.height", "image.width", "image.nChannels", "image.mode", "label").show(5, false)

In Python:

from pyspark.ml.source import image

image_dir = “/databricks-datasets/cctvVideos/train_images/”
images_df = spark.read.format(“image”).load(image_dir)
images_df.printSchema()

images_df.select("image.height", "image.width", "image.nChannels", "image.mode", "label").show(5, truncate=False)

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)
 |-- label: integer (nullable = true)


+------+-----+---------+----+-----+
|height|width|nChannels|mode|label|
+------+-----+---------+----+-----+
|288   |384  |3        |16  |0    |
|288   |384  |3        |16  |1    |
|288   |384  |3        |16  |0    |
|288   |384  |3        |16  |0    |
|288   |384  |3        |16  |0    |
+------+-----+---------+----+-----+
only showing top 5 rows

In the above sections, you got a tour of how to read data into a DataFrame from built-in data source file formats; we also showed how to create temporary views and tables from existing built-in data sources. Either way, the queries—whether using the DataFrame API or SQL—produces identical outcomes. You can examine some of these queries in the notebook in the Learning Spark 2d GitHub 39 repo and try the exercises.

Summary

To recap, this chapter explored the interoperability between DataFrame and Spark SQL. In particular, we got a flavor of how to use Spark SQL to:

  • create managed and unmanaged tables from both DataFrames and SQL

  • read from and write to various built-in data sources and file formats

  • employ spark.sql programmatic interface to issue SQL queries to structured data stored as Spark SQL tables or views

  • peruse spark.Catalog to inspect metadata associated with tables and views

  • use DataFrameWriter and DataFrameReader APIs

Through code snippets in the chapter and notebooks in the Learning Spark 2ed Github 40, you got a feel for how to use DataFrames and Spark SQL. Continuing in this light, the next chapter further explores how Spark interacts with external data sources shown in Figure-4-1, a more in-depth examples of transformations and interoperability between DataFrame and Spark SQL, take a peek at SQL queries in Spark UI, and a glance at Spark SQL performance.

1 https://en.wikipedia.org/wiki/SQL:2003

2 https://databricks.com/blog/2016/07/26/introducing-apache-spark-2-0.html

3 https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html

4 https://catalog.data.gov/dataset/airline-on-time-performance-and-causes-of-flight-delays-on-time-data

5 https://github.com/databricks/LearningSparkV2

6 https://spark.apache.org/sql/

7 https://docs.databricks.com/spark/latest/data-sources/index.html

8 https://github.com/databricks/LearningSparkV2

9 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-view.html#id1

10 https://www.waitingforcode.com/apache-spark-sql/multiple-sparksession-one-sparkcontext/read#mutliple_SparkSessions_use_cases

11 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.catalog.Catalog

12 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Catalog

13 https://github.com/databricks/LearningSparkV2

14 https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html

15 https://spark.apache.org/docs/latest/sql-data-sources.html#data-sources

16 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader

17 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader

18 https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options

19 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

20 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

21 https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

22 https://github.com/apache/parquet-format#file-format

23 https://docs.databricks.com/spark/latest/data-sources/read-json.html

24 Spark: Definitive Guide, pg 157-165

25 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

26 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader

27 Spark: Definitive Guide, pg 157

28 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

29 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader

30 https://databricks.com/blog/2018/11/30/apache-avro-as-a-built-in-data-source-in-apache-spark-2-4.html

31 https://www.confluent.io/blog/avro-kafka-data/

32 https://docs.databricks.com/spark/latest/data-sources/read-avro.html#

33 https://spark.apache.org/docs/latest/sql-data-sources-avro.html

34 https://cwiki.apache.org/confluence/display/Hive/Vectorized+Query+Execution

35 https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide#DeveloperGuide-HiveSerDe

36 https://spark.apache.org/docs/latest/sql-data-sources-orc.html

37 https://databricks.com/blog/2018/11/08/introducing-apache-spark-2-4.html

38 https://databricks.com/blog/2018/12/10/introducing-built-in-image-data-source-in-apache-spark-2-4.html

39 https://github.com/databricks/LearningSparkV2

40 https://github.com/databricks/LearningSparkV2

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required