Capítulo 4. Spark SQL y DataFrames: Introducción a las fuentes de datos incorporadas

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

En el capítulo anterior, explicamos la evolución y la justificación de la estructura en Spark. En particular, discutimos cómo el motor Spark SQL proporciona una base unificada para las API de alto nivel DataFrame y Dataset. Ahora, continuaremos nuestra discusión sobre el DataFrame y exploraremos su interoperabilidad con Spark SQL.

Este capítulo y el siguiente también exploran cómo interactúa Spark SQL con algunos de los componentes externos que se muestran en la Figura 4-1.

En particular, Spark SQL:

  • Proporciona el motor sobre el que se construyen las API estructuradas de alto nivel que exploramos en el Capítulo 3.

  • Puede leer y escribir datos en diversos formatos estructurados (por ejemplo, JSON, tablas Hive, Parquet, Avro, ORC, CSV).

  • Te permite consultar datos mediante conectores JDBC/ODBC desde fuentes de datos externas de inteligencia empresarial (BI) como Tableau, Power BI, Talend, o desde RDBMS como MySQL y PostgreSQL.

  • Proporciona una interfaz programática para interactuar con datos estructurados almacenados como tablas o vistas en una base de datos desde una aplicación Spark

  • Ofrece un intérprete de comandos interactivo para realizar consultas SQL en tus datos estructurados.

  • Admite comandos compatibles con ANSI SQL:2003 y HiveQL.

Spark SQL connectors and data sources
Figura 4-1. Conectores Spark SQL y fuentes de datos

Empecemos por cómo puedes utilizar Spark SQL en una aplicación Spark.

Uso de Spark SQL en aplicaciones Spark

El SparkSession, introducido en Spark 2.0, proporciona un punto de entrada unificado para programar Spark con las API estructuradas. Puedes utilizar SparkSession para acceder a la funcionalidad de Spark: sólo tienes que importar la clase y crear una instancia en tu código.

Para emitir cualquier consulta SQL, utiliza el método sql() en la instancia SparkSession, spark, como spark.sql("SELECT * FROM myTableName"). Todas las consultas spark.sql ejecutadas de este modo devuelven un DataFrame sobre el que puedes realizar más operaciones Spark si lo deseas, del tipo que exploramos en el Capítulo 3 y de las que aprenderás en este capítulo y en el siguiente.

Ejemplos de consulta básica

En esta sección veremos algunos ejemplos de consultas sobre el conjunto de datos Rendimiento puntual de las aerolíneas y causas de los retrasos en los vuelos, que contiene datos sobre vuelos estadounidenses, incluyendo fecha, retraso, distancia, origen y destino. Está disponible como archivo CSV con más de un millón de registros. Utilizando un esquema, leeremos los datos en un DataFrame y registraremos el DataFrame como una vista temporal (en breve hablaremos más de las vistas temporales) para poder consultarlo con SQL.

Los ejemplos de consulta se proporcionan en fragmentos de código, y los cuadernos de Python y Scala que contienen todo el código presentado aquí están disponibles en el repositorio GitHub del libro. Estos ejemplos te ofrecerán una muestra de cómo utilizar SQL en tus aplicaciones Spark a través de la interfaz programáticaspark.sql . Similar a la API DataFrame en su sabor declarativo, esta interfaz te permite consultar datos estructurados en tus aplicaciones Spark .

Normalmente, en una aplicación Spark autónoma, crearás una instancia de SparkSession manualmente, como se muestra en el siguiente ejemplo. Sin embargo, en un intérprete de comandos Spark (o en un cuaderno Databricks), SparkSession se crea por ti y es accesible a través de la variable con el nombre apropiado spark.

Empecemos por leer el conjunto de datos en una vista temporal:

// In Scala
import org.apache.spark.sql.SparkSession            
val spark = SparkSession
  .builder
  .appName("SparkSQLExampleApp")
  .getOrCreate()

// Path to data set 
val csvFile="/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"

// Read and create a temporary view
// Infer schema (note that for larger files you may want to specify the schema)
val df = spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .load(csvFile)
// Create a temporary view
df.createOrReplaceTempView("us_delay_flights_tbl")
# In Python
from pyspark.sql import SparkSession        
# Create a SparkSession
spark = (SparkSession
  .builder
  .appName("SparkSQLExampleApp")
  .getOrCreate())

# Path to data set
csv_file = "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"

# Read and create a temporary view
# Infer schema (note that for larger files you 
# may want to specify the schema)
df = (spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .load(csv_file))
df.createOrReplaceTempView("us_delay_flights_tbl")
Nota

Si quieres especificar un esquema, puedes utilizar una cadena con formato DDL. Por ejemplo

// In Scala
val schema = "date STRING, delay INT, distance INT, 
 origin STRING, destination STRING"
# In Python
schema = "`date` STRING, `delay` INT, `distance` INT, 
 `origin` STRING, `destination` STRING"

Ahora que tenemos una vista temporal, podemos realizar consultas SQL utilizando Spark SQL. Estas consultas no difieren de las que podrías hacer a una tabla SQL en, por ejemplo, una base de datos MySQL o PostgreSQL. El objetivo aquí es mostrar que Spark SQL ofrece una interfaz SQL compatible con ANSI:2003, y demostrar la interoperabilidad entre SQL y DataFrames.

El conjunto de datos sobre retrasos de vuelos en EEUU tiene cinco columnas:

  • La columna date contiene una cadena como 02190925. Cuando se convierte, se asigna a 02-19 09:25 am.

  • La columna delay indica el retraso en minutos entre la hora de salida programada y la real. Las salidas tempranas muestran números negativos.

  • La columna distance da la distancia en millas desde el aeropuerto de origen al de destino.

  • La columna origin contiene el código IATA del aeropuerto de origen.

  • La columna destination contiene el código IATA del aeropuerto de destino.

Teniendo esto en cuenta, probemos algunas consultas de ejemplo con este conjunto de datos.

En primer lugar, buscaremos todos los vuelos cuya distancia sea superior a 1.000 millas:

spark.sql("""SELECT distance, origin, destination 
FROM us_delay_flights_tbl WHERE distance > 1000 
ORDER BY distance DESC""").show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
+--------+------+-----------+
only showing top 10 rows

Como muestran los resultados, todos los vuelos más largos fueron entre Honolulú (HNL) y Nueva York (JFK). A continuación, encontraremos todos los vuelos entre San Francisco (SFO) y Chicago (ORD) con al menos dos horas de retraso:

spark.sql("""SELECT date, delay, origin, destination 
FROM us_delay_flights_tbl 
WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' 
ORDER by delay DESC""").show(10)

+--------+-----+------+-----------+
|date    |delay|origin|destination|
+--------+-----+------+-----------+
|02190925|1638 |SFO   |ORD        |
|01031755|396  |SFO   |ORD        |
|01022330|326  |SFO   |ORD        |
|01051205|320  |SFO   |ORD        |
|01190925|297  |SFO   |ORD        |
|02171115|296  |SFO   |ORD        |
|01071040|279  |SFO   |ORD        |
|01051550|274  |SFO   |ORD        |
|03120730|266  |SFO   |ORD        |
|01261104|258  |SFO   |ORD        |
+--------+-----+------+-----------+
only showing top 10 rows

Parece que hubo muchos vuelos con retrasos importantes entre estas dos ciudades, en fechas diferentes. (Como ejercicio, convierte la columna date a un formato legible y encuentra los días o meses en que estos retrasos fueron más frecuentes. ¿Estaban los retrasos relacionados con los meses de invierno o con las vacaciones)?

Intentemos una consulta más complicada en la que utilicemos la cláusula CASE de SQL. En el siguiente ejemplo, queremos etiquetar todos los vuelos de EE.UU., independientemente de su origen y destino, con una indicación de los retrasos que sufrieron: Retrasos Muy Largos (> 6 horas), Retrasos Largos (2-6 horas), etc. Añadiremos estas etiquetas legibles por humanos en una nueva columna llamada Flight_Delays:

spark.sql("""SELECT delay, origin, destination, 
              CASE
                  WHEN delay > 360 THEN 'Very Long Delays'
                  WHEN delay >= 120 AND delay <= 360 THEN 'Long Delays'
                  WHEN delay >= 60 AND delay < 120 THEN 'Short Delays'
                  WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
                  WHEN delay = 0 THEN 'No Delays'
                  ELSE 'Early'
               END AS Flight_Delays
               FROM us_delay_flights_tbl
               ORDER BY origin, delay DESC""").show(10)

+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|333  |ABE   |ATL        |Long Delays  |
|305  |ABE   |ATL        |Long Delays  |
|275  |ABE   |ATL        |Long Delays  |
|257  |ABE   |ATL        |Long Delays  |
|247  |ABE   |DTW        |Long Delays  |
|247  |ABE   |ATL        |Long Delays  |
|219  |ABE   |ORD        |Long Delays  |
|211  |ABE   |ATL        |Long Delays  |
|197  |ABE   |DTW        |Long Delays  |
|192  |ABE   |ORD        |Long Delays  |
+-----+------+-----------+-------------+
only showing top 10 rows

Al igual que con las APIs DataFrame y Dataset, con la interfaz spark.sql puedes realizar operaciones comunes de análisis de datos como las que hemos explorado en el capítulo anterior. Los cálculos realizan un recorrido idéntico en el motor SQL de Spark (para más detalles, consulta "El optimizador Catalyst" en el Capítulo 3 ), proporcionándote los mismos resultados.

Las tres consultas SQL anteriores pueden expresarse con una consulta equivalente de la API de DataFrame. Por ejemplo, la primera consulta puede expresarse en la API Python DataFrame como:

# In Python
from pyspark.sql.functions import col, desc
(df.select("distance", "origin", "destination")
  .where(col("distance") > 1000)
  .orderBy(desc("distance"))).show(10)

# Or
(df.select("distance", "origin", "destination")
  .where("distance > 1000")
  .orderBy("distance", ascending=False).show(10))

Esto produce los mismos resultados que la consulta SQL:

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
+--------+------+-----------+
only showing top 10 rows

Como ejercicio, intenta convertir las otras dos consultas SQL para que utilicen la API DataFrame.

Como muestran estos ejemplos, utilizar la interfaz SQL de Spark para consultar datos es similar a escribir una consulta SQL normal a una tabla de base de datos relacional. Aunque las consultas están en SQL, puedes sentir la similitud en legibilidad y semántica con las operaciones de la API DataFrame, que conociste en el Capítulo 3 y explorarás más a fondo en el próximo capítulo.

Para que puedas consultar datos estructurados como se muestra en los ejemplos anteriores, Spark gestiona todas las complejidades de la creación y gestión de vistas y tablas, tanto en memoria como en disco. Esto nos lleva a nuestro siguiente tema: cómo se crean y gestionan las tablas y las vistas .

Tablas y vistas SQL

Las tablas contienen datos. Asociados a cada tabla en Spark están sus metadatos pertinentes, que son información sobre la tabla y sus datos: el esquema, la descripción, el nombre de la tabla, el nombre de la base de datos, los nombres de las columnas, las particiones, la ubicación física donde residen los datos reales, etc. Todo esto se almacena en un metastore central.

En lugar de tener un metastore separado para las tablas Spark, Spark utiliza por defecto el metastore de Apache Hive, ubicado en /user/hive/warehouse, para persistir todos los metadatos sobre tus tablas. Sin embargo, puedes cambiar la ubicación por defecto estableciendo la variable de configuración de Spark spark.sql.warehouse.dir en otra ubicación, que puede establecerse en un almacenamiento distribuido local o externo.

Tablas gestionadas frente a no gestionadas

Spark te permite crear dos tipos de tablas: gestionadas y no gestionadas. Para una tabla gestionada, Spark gestiona tanto los metadatos como los datos en el almacén de archivos. Éste puede ser un sistema de archivos local, HDFS o un almacén de objetos como Amazon S3 o Azure Blob. Para una tabla no gestionada, Spark sólo gestiona los metadatos, mientras que tú mismo gestionas los datos en una fuente de datos externa como Cassandra.

Con una tabla gestionada, como Spark lo gestiona todo, un comando SQL como DROP TABLE table_name borra tanto los metadatos como los datos. Con una tabla no gestionada, el mismo comando borrará sólo los metadatos, no los datos reales. Veremos algunos ejemplos de cómo crear tablas gestionadas y no gestionadas en la siguiente sección.

Crear bases de datos y tablas SQL

Las tablas residen dentro de una base de datos. Por defecto, Spark crea tablas en la base de datos default. Para crear tu propio nombre de base de datos, puedes emitir un comando SQL desde tu aplicación o cuaderno Spark. Utilizando el conjunto de datos de retrasos de vuelos en EE.UU., vamos a crear una tabla gestionada y otra no gestionada. Para empezar, crearemos una base de datos llamada learn_spark_db y le diremos a Spark que queremos utilizar esa base de datos:

// In Scala/Python
spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")

A partir de este punto, cualquier comando que emitamos en nuestra aplicación para crear tablas tendrá como resultado que las tablas se crearán en esta base de datos y residirán bajo el nombre de base de datos learn_spark_db.

Crear una tabla gestionada

Para crear una tabla gestionada dentro de la base de datos learn_spark_db, puedes realizar una consulta SQL como la siguiente:

// In Scala/Python
spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT,  
  distance INT, origin STRING, destination STRING)")

Puedes hacer lo mismo utilizando la API DataFrame de esta forma:

# In Python
# Path to our US flight delays CSV file 
csv_file = "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
# Schema as defined in the preceding example
schema="date STRING, delay INT, distance INT, origin STRING, destination STRING"
flights_df = spark.read.csv(csv_file, schema=schema)
flights_df.write.saveAsTable("managed_us_delay_flights_tbl")

Ambas sentencias crearán la tabla gestionada us_delay_flights_tbl en la base de datos learn_spark_db.

Crear una tabla no gestionada

En cambio, puedes crear tablas no gestionadas a partir de tus propias fuentes de datos -por ejemplo, archivos Parquet, CSV o JSON almacenados en un almacén de archivos accesible a tu aplicación Spark.

Para crear una tabla no gestionada a partir de una fuente de datos, como un archivo CSV, en SQL utiliza:

spark.sql("""CREATE TABLE us_delay_flights_tbl(date STRING, delay INT, 
  distance INT, origin STRING, destination STRING) 
  USING csv OPTIONS (PATH 
  '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')""")

Y dentro de la API DataFrame utiliza

(flights_df
  .write
  .option("path", "/tmp/data/us_flights_delay")
  .saveAsTable("us_delay_flights_tbl"))
Nota

Para que puedas explorar estos ejemplos, hemos creado cuadernos de ejemplos de Python y Scala que puedes encontrar en el repositorio GitHub del libro.

Crear vistas

Además de crear tablas, Spark puede crear vistas sobre las tablas existentes. Las vistas pueden ser globales (visibles en todos los SparkSessions de un cluster determinado) o de sesión (visibles sólo en un SparkSession), y son temporales: desaparecen cuando termina tu aplicación Spark.

Lacreación de vistas tiene una sintaxis similar a la creación de tablas dentro de una base de datos. Una vez creada una vista, puedes consultarla como harías con una tabla. La diferencia entre una vista y una tabla es que las vistas no conservan realmente los datos; las tablas persisten después de que finalice tu aplicación Spark, pero las vistas desaparecen.

Puedes crear una vista a partir de una tabla existente utilizando SQL. Por ejemplo, si deseas trabajar sólo con el subconjunto del conjunto de datos de retrasos de vuelos en EE.UU. con los aeropuertos de origen de Nueva York (JFK) y San Francisco (SFO), las siguientes consultas crearán vistas temporales y temporales globales consistentes sólo en esa porción de la tabla:

-- In SQL
CREATE OR REPLACE GLOBAL TEMP VIEW us_origin_airport_SFO_global_tmp_view AS
  SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE 
  origin = 'SFO';

CREATE OR REPLACE TEMP VIEW us_origin_airport_JFK_tmp_view AS
  SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE 
  origin = 'JFK'

Puedes conseguir lo mismo con la API del Marco de datos de la siguiente manera:

# In Python
df_sfo = spark.sql("SELECT date, delay, origin, destination FROM 
  us_delay_flights_tbl WHERE origin = 'SFO'")
df_jfk = spark.sql("SELECT date, delay, origin, destination FROM 
  us_delay_flights_tbl WHERE origin = 'JFK'")

# Create a temporary and global temporary view
df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")
df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")

Una vez creadas estas vistas, puedes realizar consultas sobre ellas igual que lo harías sobre una tabla. Ten en cuenta que cuando accedas a una vista temporal global debes utilizar el prefijo global_temp.<view_name>porque Spark crea vistas temporales globales en una base de datos temporal global llamada global_temp. Por ejemplo:

-- In SQL 
SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view

En cambio, puedes acceder a la vista temporal normal sin el prefijo global_temp :

-- In SQL 
SELECT * FROM us_origin_airport_JFK_tmp_view
// In Scala/Python
spark.read.table("us_origin_airport_JFK_tmp_view")
// Or
spark.sql("SELECT * FROM us_origin_airport_JFK_tmp_view")

También puedes soltar una vista igual que harías con una tabla:

-- In SQL
DROP VIEW IF EXISTS us_origin_airport_SFO_global_tmp_view;
DROP VIEW IF EXISTS us_origin_airport_JFK_tmp_view
// In Scala/Python
spark.catalog.dropGlobalTempView("us_origin_airport_SFO_global_tmp_view")
spark.catalog.dropTempView("us_origin_airport_JFK_tmp_view")

Vistas temporales frente a vistas temporales globales

La diferencia entre vistas temporales y vistas temporales globales es sutil, pero puede ser una fuente de ligera confusión entre los desarrolladores que se inician en Spark. Una vista temporal está vinculada a un único SparkSession dentro de una aplicación Spark. Por el contrario, una vista temporal global es visible a través de múltiples SparkSessions dentro de una aplicación Spark. Sí, puedes crear varios SparkSession s dentro de una única aplicación Spark; esto puede ser útil, por ejemplo, en casos en los que quieras acceder (y combinar) datos de dos SparkSessions diferentes que no compartan las mismas configuraciones de metastore de Hive.

Ver los metadatos

Como se ha mencionado anteriormente, Spark gestiona los metadatos asociados a cada tabla gestionada o no gestionada. Esto se captura en el Cataloguna abstracción de alto nivel en Spark SQL para almacenar metadatos. La funcionalidad de Catalogse amplió en Spark 2.x con nuevos métodos públicos que te permiten examinar los metadatos asociados a tus bases de datos, tablas y vistas. Spark 3.0 lo amplía para utilizar catalog externo (del que hablamos brevemente en el Capítulo 12).

Por ejemplo, dentro de una aplicación Spark, después de crear la variable SparkSession spark , puedes acceder a todos los metadatos almacenados a través de métodos como éstos:

// In Scala/Python
spark.catalog.listDatabases()
spark.catalog.listTables()
spark.catalog.listColumns("us_delay_flights_tbl")

Importa el cuaderno del repositorio GitHub del libro y pruébalo.

Almacenamiento en caché de tablas SQL

Aunque hablaremos de las estrategias de almacenamiento en caché de tablas en el próximo capítulo, merece la pena mencionar aquí que, al igual que los DataFrames, puedes almacenar en caché y eliminar de la caché tablas y vistas SQL. En Spark 3.0, además de otras opciones, puedes especificar una tabla como LAZY, lo que significa que sólo debe almacenarse en caché cuando se utilice por primera vez, en lugar de inmediatamente:

-- In SQL
CACHE [LAZY] TABLE <table-name>
UNCACHE TABLE <table-name>

Leer tablas en marcos de datos

A menudo, los ingenieros de datos construyen canalizaciones de datos como parte de sus procesos habituales de ingestión de datos y ETL. Rellenan bases de datos y tablas Spark SQL con datos depurados para que los consuman las aplicaciones posteriores.

Supongamos que tienes una base de datos existente, learn_spark_db, y una tabla, us_delay_flights_tbl, lista para usar. En lugar de leer de un archivo JSON externo, puedes utilizar simplemente SQL para consultar la tabla y asignar el resultado devuelto a un DataFrame:

// In Scala
val usFlightsDF = spark.sql("SELECT * FROM us_delay_flights_tbl")
val usFlightsDF2 = spark.table("us_delay_flights_tbl")
# In Python
us_flights_df = spark.sql("SELECT * FROM us_delay_flights_tbl")
us_flights_df2 = spark.table("us_delay_flights_tbl")

Ahora tienes un DataFrame depurado leído de una tabla SQL existente en Spark. También puedes leer datos en otros formatos utilizando las fuentes de datos incorporadas de Spark, lo que te da flexibilidad para interactuar con varios formatos de archivo comuness.

Fuentes de datos para marcos de datos y tablas SQL

Como se muestra en la Figura 4-1, Spark SQL proporciona una interfaz para diversas fuentes de datos. También proporciona un conjunto de métodos comunes para leer y escribir datos en y desde estas fuentes de datos utilizando la API de Fuentes de Datos.

En esta sección cubriremos algunas de las fuentes de datos incorporadas, los formatos de archivo disponibles y las formas de cargar y escribir datos, junto con opciones específicas relativas a estas fuentes de datos. Pero antes, veamos más de cerca dos construcciones de alto nivel de la API de fuentes de datos que dictan la forma en que interactúas con las distintas fuentes de datos: DataFrameReader y DataFrameWriter.

LectorDeDatos

DataFrameReader es la construcción básica para leer datos de una fuente de datos en un Marco de datos. Tiene un formato definido y un patrón de uso recomendado:

DataFrameReader.format(args).option("key", "value").schema(args).load()

Este patrón de encadenar métodos es común en Spark, y fácil de leer. Lo vimos en el Capítulo 3 cuando exploramos los patrones habituales de análisis de datos.

Ten en cuenta que sólo puedes acceder a un DataFrameReader a través de una instancia de SparkSession. Es decir, no puedes crear una instancia de DataFrameReader. Para obtener un manejador de instancia de él, utiliza:

SparkSession.read 
// or 
SparkSession.readStream

Mientras que read devuelve un "handle" a DataFrameReader para leer en un DataFrame desde una fuente de datos estática, readStream devuelve una instancia para leer desde una fuente de streaming. (Trataremos el Streaming Estructurado más adelante en el libro).

Los argumentos de cada uno de los métodos públicos de DataFrameReader toman distintos valores. La Tabla 4-1 los enumera, con un subconjunto de los argumentos admitidos .

Tabla 4-1. Métodos, argumentos y opciones de DataFrameReader
Método Argumentos Descripción
format() "parquet", "csv", "txt", "json", "jdbc", "orc", "avro", etc. Si no especificas este método, entonces el predeterminado es Parquet o el que se establezca en spark.sql.sources.default.
option() ("mode", {PERMISSIVE | FAILFAST | DROPMALFORMED } )
("inferSchema", {true | false})
("path", "path_file_data_source")
Una serie de pares clave/valor y opciones.
La documentación de Spark muestra algunos ejemplos y explica los distintos modos y sus acciones. El modo por defecto es PERMISSIVE. Las opciones "inferSchema" y "mode" son específicas de los formatos de archivo JSON y CSV.
schema() DDL String o StructType, por ejemplo, 'A INT, B STRING' o
StructType(...)
Para los formatos JSON o CSV, puedes especificar que se infiera el esquema en el método option(). En general, proporcionar un esquema para cualquier formato hace que la carga sea más rápida y garantiza que tus datos se ajusten al esquema esperado.
load() "/path/to/data/source" La ruta a la fuente de datos. Puede estar vacía si se especifica en option("path", "...").

Aunque no vamos a enumerar exhaustivamente todas las diferentes combinaciones de argumentos y opciones, la documentación de Python, Scala, R y Java ofrece sugerencias y orientación. No obstante, merece la pena mostrar un par de ejemplos:

// In Scala
// Use Parquet 
val file = """/databricks-datasets/learning-spark-v2/flights/summary-
  data/parquet/2010-summary.parquet"""
val df = spark.read.format("parquet").load(file) 
// Use Parquet; you can omit format("parquet") if you wish as it's the default
val df2 = spark.read.load(file)
// Use CSV
val df3 = spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .load("/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*")
// Use JSON
val df4 = spark.read.format("json")
  .load("/databricks-datasets/learning-spark-v2/flights/summary-data/json/*")
Nota

En general, no se necesita ningún esquema cuando se lee de una fuente de datos Parquet estática: los metadatos Parquet suelen contener el esquema, por lo que se infiere. Sin embargo, para las fuentes de datos en streaming tendrás que proporcionar un esquema. (Trataremos la lectura de fuentes de datos en streaming en el Capítulo 8).

Parquet es la fuente de datos predeterminada y preferida de Spark porque es eficiente, utiliza almacenamiento columnar y emplea un rápido algoritmo de compresión. Verás ventajas adicionales más adelante (como el pushdown columnar), cuando tratemos el optimizador Catalyst en mayor profundidad.

EscritorDeDatos

DataFrameWriter hace lo contrario que su homólogo: guarda o escribe datos en una fuente de datos incorporada especificada. A diferencia de DataFrameReader, no accedes a su instancia desde SparkSession, sino desde el DataFrame que deseas guardar. Tiene algunos patrones de uso recomendados:

DataFrameWriter.format(args)
  .option(args)
  .bucketBy(args)
  .partitionBy(args)
  .save(path)

DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table)

Para obtener un manejador de instancia, utiliza

DataFrame.write
// or 
DataFrame.writeStream

Los argumentos de cada uno de los métodos a DataFrameWriter también toman valores diferentes. Los enumeramos en la Tabla 4-2, con un subconjunto de los argumentos admitidos .

Tabla 4-2. Métodos, argumentos y opciones de DataFrameWriter
Método Argumentos Descripción
format() "parquet", "csv", "txt", "json", "jdbc", "orc", "avro", etc. Si no especificas este método, entonces el predeterminado es Parquet o lo que se establezca en spark.sql.sources.default.
option() ("mode", {append | overwrite | ignore | error or errorifexists} )
("mode", {SaveMode.Overwrite | SaveMode.Append, SaveMode.Ignore, SaveMode.ErrorIfExists})
("path", "path_to_write_to")
Una serie de pares clave/valor y opciones. La documentación de Spark muestra algunos ejemplos. Se trata de un método sobrecargado. Las opciones de modo por defecto son error or errorifexists y SaveMode.ErrorIfExists; lanzan una excepción en tiempo de ejecución si los datos ya existen.
bucketBy() (numBuckets, col, col..., coln) El número de cubos y los nombres de las columnas por los que hacer los cubos. Utiliza el esquema de bucketing de Hive en un sistema de archivos.
save() "/path/to/data/source" La ruta en la que guardar. Puede estar vacía si se especifica en option("path", "...").
saveAsTable() "table_name" La tabla en la que guardar.

He aquí un breve fragmento de ejemplo para ilustrar el uso de métodos y argumentos:

// In Scala
// Use JSON
val location = ... 
df.write.format("json").mode("overwrite").save(location)

Parquet

Comenzaremos nuestra exploración de las fuentes de datos con Parquet, porque es la fuente de datos por defecto en Spark. Apoyado y ampliamente utilizado por muchos marcos y plataformas de procesamiento de big data, Parquet es un formato de archivo columnar de código abierto que ofrece muchas optimizaciones de E/S (como la compresión, que ahorra espacio de almacenamiento y permite un acceso rápido a las columnas de datos).

Debido a su eficacia y a estas optimizaciones, te recomendamos que después de haber transformado y limpiado tus datos, guardes tus DataFrames en formato Parquet para su consumo posterior. (Parquet es también el formato de apertura de tablas por defecto de Delta Lake, del que hablaremos en el Capítulo 9).

Lectura de archivos Parquet en un DataFrame

Los archivos Parquet se almacenan en una estructura de directorios que contiene los archivos de datos, los metadatos, una serie de archivos comprimidos y algunos archivos de estado. Los metadatos del pie de página contienen la versión del formato de archivo, el esquema y datos de columna como la ruta, etc.

Por ejemplo, un directorio de un archivo Parquet puede contener un conjunto de archivos como éste:

_SUCCESS
_committed_1799640464332036264
_started_1799640464332036264
part-00000-tid-1799640464332036264-91273258-d7ef-4dc7-<...>-c000.snappy.parquet

Puede haber varios archivos comprimidos en parte-XXXX en un directorio (los nombres que se muestran aquí se han acortado para que quepan en la página).

Para leer archivos Parquet en un Marco de datos, sólo tienes que especificar el formato y la ruta:

// In Scala
val file = """/databricks-datasets/learning-spark-v2/flights/summary-data/
  parquet/2010-summary.parquet/"""
val df = spark.read.format("parquet").load(file)
# In Python
file = """/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/
  2010-summary.parquet/"""
df = spark.read.format("parquet").load(file)

A menos que estés leyendo de una fuente de datos en streaming, no es necesario que proporciones el esquema, porque Parquet lo guarda como parte de sus metadatos.

Lectura de archivos Parquet en una tabla SQL de Spark

Además de leer archivos Parquet en un Spark DataFrame, también puedes crear una tabla o vista no gestionada Spark SQL directamente utilizando SQL:

-- In SQL
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING parquet
    OPTIONS (
      path "/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/
      2010-summary.parquet/" )

Una vez que hayas creado la tabla o vista, puedes leer datos en un Marco de datos utilizando SQL, como vimos en algunos ejemplos anteriores:

// In Scala
spark.sql("SELECT * FROM us_delay_flights_tbl").show()
# In Python
spark.sql("SELECT * FROM us_delay_flights_tbl").show()

Ambas operaciones devuelven los mismos resultados:

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

Escribir DataFrames en archivos Parquet

Escribir o guardar un DataFrame como tabla o archivo es una operación habitual en Spark. Para escribir un DataFrame sólo tienes que utilizar los métodos y argumentos de DataFrameWriter descritos anteriormente en este capítulo, proporcionando la ubicación en la que guardar los archivos Parquet. Por ejemplo

// In Scala
df.write.format("parquet")
  .mode("overwrite")
  .option("compression", "snappy")
  .save("/tmp/data/parquet/df_parquet")
# In Python
(df.write.format("parquet")
  .mode("overwrite")
  .option("compression", "snappy")
  .save("/tmp/data/parquet/df_parquet"))
Nota

Recuerda que Parquet es el formato de archivo por defecto. Si no incluyes el método format(), el DataFrame se seguirá guardando como un archivo Parquet.

Esto creará un conjunto de archivos Parquet compactos y comprimidos en la ruta especificada. Como aquí hemos utilizado snappy como opción de compresión, tendremos archivos comprimidos snappy. Por brevedad, este ejemplo sólo ha generado un archivo; normalmente, puede haber una docena de archivos creados:

-rw-r--r--  1 jules  wheel    0 May 19 10:58 _SUCCESS
-rw-r--r--  1 jules  wheel  966 May 19 10:58 part-00000-<...>-c000.snappy.parquet

Escribir DataFrames en tablas SQL de Spark

Escribir un DataFrame en una tabla SQL es tan fácil como escribir en un archivo: sólo tienes que utilizar saveAsTable() en lugar de save(). Esto creará una tabla gestionada llamada us_delay_flights_tbl:

// In Scala
df.write
  .mode("overwrite")
  .saveAsTable("us_delay_flights_tbl")
# In Python
(df.write
  .mode("overwrite")
  .saveAsTable("us_delay_flights_tbl"))

En resumen, Parquet es el formato de archivo de origen de datos incorporado preferido y por defecto en Spark, y ha sido adoptado por muchos otros frameworks. Te recomendamos que utilices este formato en tus procesos ETL y de ingestión de datos.

JSON

La Notación de Objetos JavaScript (JSON) también es un formato de datos popular. Saltó a la fama por ser un formato fácil de leer y de descifrar en comparación con XML. Tiene dos formatos de representación el modo de una sola línea y el modo de varias líneas. Spark admite ambos modos.

En el modo de una sola línea , cada línea denota un único objeto JSON, mientras que en el modo multilínea, todo el objeto multilínea constituye un único objeto JSON. Para leer en este modo, establece multiLine a true en el método option().

Leer un archivo JSON en un DataFrame

Puedes leer un archivo JSON en un DataFrame del mismo modo que lo hacías con Parquet: sólo tienes que especificar "json" en el método format():

// In Scala
val file = "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
val df = spark.read.format("json").load(file)
# In Python
file = "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
df = spark.read.format("json").load(file)

Lectura de un archivo JSON en una tabla SQL de Spark

También puedes crear una tabla SQL a partir de un archivo JSON, igual que hiciste con Parquet:

-- In SQL
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING json
    OPTIONS (
      path  "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
    )

Una vez creada la tabla, puedes leer los datos en un Marco de datos utilizando SQL:

// In Scala/Python
spark.sql("SELECT * FROM us_delay_flights_tbl").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
|United States    |Singapore          |1    |
|United States    |Grenada            |62   |
|Costa Rica       |United States      |588  |
|Senegal          |United States      |40   |
|Moldova          |United States      |1    |
+-----------------+-------------------+-----+
only showing top 10 rows

Escribir DataFrames en archivos JSON

Guardar un DataFrame como archivo JSON es sencillo. Especifica los métodos y argumentos DataFrameWriter y proporciona la ubicación en la que guardar los archivos JSON:

// In Scala
df.write.format("json")
  .mode("overwrite")
  .option("compression", "snappy")
  .save("/tmp/data/json/df_json")
# In Python
(df.write.format("json")
  .mode("overwrite")
  .option("compression", "snappy")
  .save("/tmp/data/json/df_json"))

Crea un directorio en la ruta especificada con un conjunto de archivos JSON compactos:

-rw-r--r--  1 jules  wheel   0 May 16 14:44 _SUCCESS
-rw-r--r--  1 jules  wheel  71 May 16 14:44 part-00000-<...>-c000.json

Opciones de la fuente de datos JSON

La Tabla 4-3 describe las opciones JSON habituales para DataFrameReader y DataFrameWriter. Para obtener una lista completa, te remitimos a la documentación .

Tabla 4-3. Opciones JSON para DataFrameReader y DataFrameWriter
Nombre de la propiedad Valores Significado Alcance
compression none, uncompressed, bzip2, deflate, gzip, lz4, o snappy Utiliza este códec de compresión para escribir. Ten en cuenta que la lectura sólo detectará la compresión o el códec a partir de la extensión del archivo. Escribe
dateFormat yyyy-MM-dd o DateTimeFormatter Utiliza este formato o cualquier formato de la página de Java DateTimeFormatter. Lectura/escritura
multiLine true, false Utiliza el modo multilínea. Por defecto es false (modo de una sola línea). Leer
allowUnquotedFieldNames true, false Permitir nombres de campo JSON sin entrecomillar. Por defecto es false. Leer

CSV

Tan utilizado como los archivos de texto sin formato, este formato de archivo de texto común captura cada dato o campo delimitado por una coma; cada línea con campos separados por comas representa un registro. Aunque la coma es el separador por defecto, puedes utilizar otros delimitadores para separar campos en los casos en que las comas formen parte de tus datos. Las hojas de cálculo más populares pueden generar archivos CSV, por lo que es un formato popular entre los analistas de datos y de negocio.

Leer un archivo CSV en un DataFrame

Al igual que con las otras fuentes de datos incorporadas, puedes utilizar los métodos y argumentos de DataFrameReader para leer un archivo CSV en un DataFrame:

// In Scala
val file = "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
val schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"

val df = spark.read.format("csv")
  .schema(schema)
  .option("header", "true")
  .option("mode", "FAILFAST")     // Exit if any errors
  .option("nullValue", "")        // Replace any null data with quotes
  .load(file)
# In Python
file = "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"
df = (spark.read.format("csv")
  .option("header", "true")
  .schema(schema)
  .option("mode", "FAILFAST")  # Exit if any errors
  .option("nullValue", "")     # Replace any null data field with quotes
  .load(file))

Lectura de un archivo CSV en una tabla SQL de Spark

Crear una tabla SQL a partir de una fuente de datos CSV no difiere de utilizar Parquet o JSON:

-- In SQL
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING csv
    OPTIONS (
      path "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*",
      header "true",
      inferSchema "true",
      mode "FAILFAST"
    )

Una vez creada la tabla, puedes leer los datos en un Marco de datos utilizando SQL como antes:

// In Scala/Python
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

Escribir DataFrames en archivos CSV

Guardar un DataFrame como archivo CSV es sencillo. Especifica los métodos y argumentos apropiados de DataFrameWriter, y proporciona la ubicación en la que guardar los archivos CSV:

// In Scala
df.write.format("csv").mode("overwrite").save("/tmp/data/csv/df_csv")
# In Python
df.write.format("csv").mode("overwrite").save("/tmp/data/csv/df_csv")

Esto genera una carpeta en la ubicación especificada, poblada con un montón de archivos comprimidos y compactos:

-rw-r--r--  1 jules  wheel   0 May 16 12:17 _SUCCESS
-rw-r--r--  1 jules  wheel  36 May 16 12:17 part-00000-251690eb-<...>-c000.csv

Opciones de fuente de datos CSV

La Tabla 4-4 describe algunas de las opciones comunes de CSV para DataFrameReader y DataFrameWriter. Como los archivos CSV pueden ser complejos, hay muchas opciones disponibles; para obtener una lista completa te remitimos a la documentación .

Tabla 4-4. Opciones CSV para DataFrameReader y DataFrameWriter
Nombre de la propiedad Valores Significado Alcance
compression none, bzip2, deflate, gzip, lz4, o snappy Utiliza este códec de compresión para escribir. Escribe
dateFormat yyyy-MM-dd o DateTimeFormatter Utiliza este formato o cualquier formato de la página de Java DateTimeFormatter. Lectura/escritura
multiLine true, false Utiliza el modo multilínea. Por defecto es false (modo de una sola línea). Leer
inferSchema true, false Si true, Spark determinará los tipos de datos de las columnas. Por defecto es false. Leer
sep Cualquier personaje Utiliza este carácter para separar los valores de las columnas de una fila. El delimitador por defecto es una coma (,). Lectura/escritura
escape Cualquier personaje Utiliza este carácter para escapar de las comillas. Por defecto es \. Lectura/escritura
header true, false Indica si la primera línea es una cabecera que indica el nombre de cada columna. Por defecto es false. Lectura/escritura

Avro

Introducido en Spark 2.4 como fuente de datos incorporada, el formato Avro es utilizado, por ejemplo, por Apache Kafka para serializar y deserializar mensajes. Ofrece muchas ventajas, como el mapeo directo a JSON, velocidad y eficiencia, y enlaces disponibles para muchos lenguajes de programación.

Leer un archivo Avro en un DataFrame

La lectura de un archivo Avro en un DataFrame utilizando DataFrameReader es coherente en su uso con las otras fuentes de datos que hemos tratado en esta sección:

// In Scala
val df = spark.read.format("avro")
 .load("/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*")
df.show(false)
# In Python
df = (spark.read.format("avro")
  .load("/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"))
df.show(truncate=False)


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

Lectura de un archivo Avro en una tabla SQL de Spark

De nuevo, crear tablas SQL utilizando una fuente de datos Avro no difiere de utilizar Parquet, JSON o CSV:

-- In SQL 
CREATE OR REPLACE TEMPORARY VIEW episode_tbl
    USING avro
    OPTIONS (
      path "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
    )

Una vez que hayas creado una tabla, puedes leer datos en un Marco de datos utilizando SQL:

// In Scala
spark.sql("SELECT * FROM episode_tbl").show(false)
# In Python
spark.sql("SELECT * FROM episode_tbl").show(truncate=False)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

Escribir DataFrames en archivos Avro

Escribir un Marco de datos como archivo Avro es sencillo. Como de costumbre, especifica los métodos y argumentos apropiados de DataFrameWriter, y proporciona la ubicación en la que guardar los archivos Avro:

// In Scala
df.write
  .format("avro")
  .mode("overwrite")
  .save("/tmp/data/avro/df_avro")
# In Python
(df.write
  .format("avro")
  .mode("overwrite")
  .save("/tmp/data/avro/df_avro"))

Esto genera una carpeta en la ubicación especificada, poblada con un montón de archivos comprimidos y compactos:

-rw-r--r--  1 jules  wheel    0 May 17 11:54 _SUCCESS
-rw-r--r--  1 jules  wheel  526 May 17 11:54 part-00000-ffdf70f4-<...>-c000.avro

Opciones de fuente de datos Avro

En la Tabla 4-5 se describen las opciones habituales de DataFrameReader y DataFrameWriter. Encontrarás una lista completa de opciones en la documentación .

Tabla 4-5. Opciones Avro para DataFrameReader y DataFrameWriter
Nombre de la propiedad Valor por defecto Significado Alcance
avroSchema Ninguno Esquema Avro opcional proporcionado por un usuario en formato JSON. El tipo de datos y la denominación de los campos del registro deben coincidir con los datos Avro de entrada o con los datos Catalyst (tipo de datos interno de Spark), de lo contrario la acción de lectura/escritura fallará. Lectura/escritura
recordName topLevelRecord Nombre del registro de nivel superior en el resultado de la escritura, que se requiere en la especificación Avro. Escribe
recordNamespace "" Registra el espacio de nombres en el resultado de la escritura. Escribe
ignoreExtension true Si esta opción está activada, se cargan todos los archivos (con y sin la extensión .avro ). De lo contrario, se ignoran los archivos sin la extensión .avro. Leer
compression snappy Te permite especificar el códec de compresión a utilizar en la escritura. Los códecs admitidos actualmente son uncompressed, snappy, deflate, bzip2 y xz.
Si no se establece esta opción, se tiene en cuenta el valor de spark.sql.avro.compression.codec.
Escribe

ORC

Como formato de archivo columnar optimizado adicional, Spark 2.x admite un lector ORC vectorizado. Dos configuraciones de Spark dictan qué implementación de ORC utilizar. Cuando spark.sql.orc.impl está configurado como native y spark.sql.orc.enableVectorizedReader como true, Spark utiliza el lector ORC vectorizado. Un lector vectorizado lee bloques de filas (a menudo 1.024 por bloque) en lugar de una fila cada vez, lo que agiliza las operaciones y reduce el uso de CPU en operaciones intensivas como escaneos, filtros, agregaciones y uniones.

Para las tablas Hive ORC SerDe (serialización y deserialización) creadas con el comando SQL USING HIVE OPTIONS (fileFormat 'ORC'), se utiliza el lector vectorizado cuando el parámetro de configuración de Spark spark.sql.hive.convertMetastoreOrc se establece en true.

Leer un archivo ORC en un Marco de datos

Para leer en un DataFrame utilizando el lector vectorizado ORC, sólo tienes que utilizar los métodos y opciones normales de DataFrameReader:

// In Scala 
val file = "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
val df = spark.read.format("orc").load(file)
df.show(10, false)
# In Python
file = "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
df = spark.read.format("orc").option("path", file).load()
df.show(10, False)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

Lectura de un archivo ORC en una tabla SQL de Spark

No hay ninguna diferencia con Parquet, JSON, CSV o Avro a la hora de crear una vista SQL utilizando una fuente de datos ORC:

-- In SQL
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING orc
    OPTIONS (
      path "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
    )

Una vez creada una tabla, puedes leer datos en un Marco de datos utilizando SQL como de costumbre:

// In Scala/Python
spark.sql("SELECT * FROM us_delay_flights_tbl").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

Escribir DataFrames en archivos ORC

Volver a escribir un DataFrame transformado después de leerlo es igualmente sencillo utilizando los métodos DataFrameWriter métodos:

// In Scala
df.write.format("orc")
  .mode("overwrite")
  .option("compression", "snappy")
  .save("/tmp/data/orc/df_orc")
# In Python
(df.write.format("orc")
  .mode("overwrite")
  .option("compression", "snappy")
  .save("/tmp/data/orc/flights_orc"))

El resultado será una carpeta en la ubicación especificada que contendrá algunos archivos ORC comprimidos:

-rw-r--r--  1 jules  wheel    0 May 16 17:23 _SUCCESS
-rw-r--r--  1 jules  wheel  547 May 16 17:23 part-00000-<...>-c000.snappy.orc

Imágenes

En Spark 2.4, la comunidad introdujo una nueva fuente de datos, los archivos de imagen, para dar soporte a marcos de aprendizaje profundo y aprendizaje automático como TensorFlow y PyTorch. Para las aplicaciones de aprendizaje automático basadas en la visión por ordenador, es importante cargar y procesar conjuntos de datos de imágenes.

Leer un archivo de imagen en un DataFrame

Como con todos los formatos de archivo anteriores, puedes utilizar los métodos y opciones de DataFrameReader para leer un archivo de imagen, como se muestra aquí:

// In Scala
import org.apache.spark.ml.source.image

val imageDir = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
val imagesDF = spark.read.format("image").load(imageDir)

imagesDF.printSchema

imagesDF.select("image.height", "image.width", "image.nChannels", "image.mode", 
  "label").show(5, false)
# In Python
from pyspark.ml import image

image_dir = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
images_df = spark.read.format("image").load(image_dir)
images_df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)
 |-- label: integer (nullable = true)

images_df.select("image.height", "image.width", "image.nChannels", "image.mode", 
  "label").show(5, truncate=False)

+------+-----+---------+----+-----+
|height|width|nChannels|mode|label|
+------+-----+---------+----+-----+
|288   |384  |3        |16  |0    |
|288   |384  |3        |16  |1    |
|288   |384  |3        |16  |0    |
|288   |384  |3        |16  |0    |
|288   |384  |3        |16  |0    |
+------+-----+---------+----+-----+
only showing top 5 rows

Archivos binarios

Spark 3.0 añade soporte para archivos binarios como fuente de datos. La página DataFrameReader convierte cada archivo binario en una única fila (registro) DataFrame que contiene el contenido bruto y los metadatos del archivo. La fuente de datos de archivos binarios produce un DataFrame con las siguientes columnas:

  • camino: StringType

  • modificationTime: TimestampType

  • longitud: TipoLargo

  • contenido: TipoBinario

Leer un archivo binario en un DataFrame

Para leer archivos binarios, especifica el formato de la fuente de datos como binaryFile. Puedes cargar archivos con rutas que coincidan con un patrón global dado, conservando el comportamiento del descubrimiento de particiones con la opción de fuente de datos pathGlobFilter. Por ejemplo, el siguiente código lee todos los archivos JPG del directorio de entrada con cualquier directorio particionado:

// In Scala
val path = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
val binaryFilesDF = spark.read.format("binaryFile")
  .option("pathGlobFilter", "*.jpg")
  .load(path)
binaryFilesDF.show(5)
# In Python
path = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
binary_files_df = (spark.read.format("binaryFile")
  .option("pathGlobFilter", "*.jpg")
  .load(path))
binary_files_df.show(5)

+--------------------+-------------------+------+--------------------+-----+
|                path|   modificationTime|length|             content|label|
+--------------------+-------------------+------+--------------------+-----+
|file:/Users/jules...|2020-02-12 12:04:24| 55037|[FF D8 FF E0 00 1...|    0|
|file:/Users/jules...|2020-02-12 12:04:24| 54634|[FF D8 FF E0 00 1...|    1|
|file:/Users/jules...|2020-02-12 12:04:24| 54624|[FF D8 FF E0 00 1...|    0|
|file:/Users/jules...|2020-02-12 12:04:24| 54505|[FF D8 FF E0 00 1...|    0|
|file:/Users/jules...|2020-02-12 12:04:24| 54475|[FF D8 FF E0 00 1...|    0|
+--------------------+-------------------+------+--------------------+-----+
only showing top 5 rows

Para ignorar el descubrimiento de datos de partición en un directorio, puedes configurar recursiveFileLookup en "true":

// In Scala
val binaryFilesDF = spark.read.format("binaryFile")
  .option("pathGlobFilter", "*.jpg")
  .option("recursiveFileLookup", "true")
  .load(path)
binaryFilesDF.show(5)
# In Python
binary_files_df = (spark.read.format("binaryFile")
  .option("pathGlobFilter", "*.jpg")
  .option("recursiveFileLookup", "true")
  .load(path))
binary_files_df.show(5)

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|file:/Users/jules...|2020-02-12 12:04:24| 55037|[FF D8 FF E0 00 1...|
|file:/Users/jules...|2020-02-12 12:04:24| 54634|[FF D8 FF E0 00 1...|
|file:/Users/jules...|2020-02-12 12:04:24| 54624|[FF D8 FF E0 00 1...|
|file:/Users/jules...|2020-02-12 12:04:24| 54505|[FF D8 FF E0 00 1...|
|file:/Users/jules...|2020-02-12 12:04:24| 54475|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

Ten en cuenta que la columna label está ausente cuando la opción recursiveFileLookup está configurada como "true".

Actualmente, la fuente de datos de archivos binarios no permite volver a escribir un Marco de datos en el formato de archivo original.

En esta sección, has visto cómo leer datos en un Marco de datos desde una serie de formatos de archivo compatibles. También te mostramos cómo crear vistas y tablas temporales a partir de las fuentes de datos incorporadas existentes. Tanto si utilizas la API del DataFrame como SQL, las consultas producen resultados idénticos. Puedes examinar algunas de estas consultas en el cuaderno disponible en el repositorio de GitHub de este libro.

Resumen

Para recapitular, este capítulo ha explorado la interoperabilidad entre la API de DataFrame y Spark SQL. En concreto, te has hecho una idea de cómo utilizar Spark SQL para:

  • Crea tablas gestionadas y no gestionadas utilizando Spark SQL y la API DataFrame.

  • Lee y escribe en varias fuentes de datos y formatos de archivo incorporados.

  • Utiliza la interfaz programática spark.sql para emitir consultas SQL sobre datos estructurados almacenados como tablas o vistas SQL de Spark.

  • Navega por Spark Catalog para inspeccionar los metadatos asociados a tablas y vistas.

  • Utiliza las API DataFrameWriter y DataFrameReader.

A través de los fragmentos de código del capítulo y de los cuadernos disponibles en el repositorio GitHub del libro, te has hecho una idea de cómo utilizar DataFrames y Spark SQL. Siguiendo en esta línea, el siguiente capítulo explora más a fondo cómo interactúa Spark con las fuentes de datos externas que se muestran en la Figura 4-1. Verás algunos ejemplos más detallados de transformaciones y de la interoperabilidad entre la API de DataFrame y Spark SQL .

Get Aprender Spark, 2ª Edición now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.