Capítulo 4. Spark SQL y DataFrames: Introducción a las fuentes de datos incorporadas
Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com
En el capítulo anterior, explicamos la evolución y la justificación de la estructura en Spark. En particular, discutimos cómo el motor Spark SQL proporciona una base unificada para las API de alto nivel DataFrame y Dataset. Ahora, continuaremos nuestra discusión sobre el DataFrame y exploraremos su interoperabilidad con Spark SQL.
Este capítulo y el siguiente también exploran cómo interactúa Spark SQL con algunos de los componentes externos que se muestran en la Figura 4-1.
En particular, Spark SQL:
Proporciona el motor sobre el que se construyen las API estructuradas de alto nivel que exploramos en el Capítulo 3.
Puede leer y escribir datos en diversos formatos estructurados (por ejemplo, JSON, tablas Hive, Parquet, Avro, ORC, CSV).
Te permite consultar datos mediante conectores JDBC/ODBC desde fuentes de datos externas de inteligencia empresarial (BI) como Tableau, Power BI, Talend, o desde RDBMS como MySQL y PostgreSQL.
Proporciona una interfaz programática para interactuar con datos estructurados almacenados como tablas o vistas en una base de datos desde una aplicación Spark
Ofrece un intérprete de comandos interactivo para realizar consultas SQL en tus datos estructurados.
Admite comandos compatibles con ANSI SQL:2003 y HiveQL.
Empecemos por cómo puedes utilizar Spark SQL en una aplicación Spark.
Uso de Spark SQL en aplicaciones Spark
El SparkSession
, introducido en Spark 2.0, proporciona un punto de entrada unificado para programar Spark con las API estructuradas. Puedes utilizar SparkSession
para acceder a la funcionalidad de Spark: sólo tienes que importar la clase y crear una instancia en tu código.
Para emitir cualquier consulta SQL, utiliza el método sql()
en la instancia SparkSession
, spark
, como spark.sql("SELECT * FROM myTableName")
. Todas las consultas spark.sql
ejecutadas de este modo devuelven un DataFrame sobre el que puedes realizar más operaciones Spark si lo deseas, del tipo que exploramos en el Capítulo 3 y de las que aprenderás en este capítulo y en el siguiente.
Ejemplos de consulta básica
En esta sección veremos algunos ejemplos de consultas sobre el conjunto de datos Rendimiento puntual de las aerolíneas y causas de los retrasos en los vuelos, que contiene datos sobre vuelos estadounidenses, incluyendo fecha, retraso, distancia, origen y destino. Está disponible como archivo CSV con más de un millón de registros. Utilizando un esquema, leeremos los datos en un DataFrame y registraremos el DataFrame como una vista temporal (en breve hablaremos más de las vistas temporales) para poder consultarlo con SQL.
Los ejemplos de consulta se proporcionan en fragmentos de código, y los cuadernos de Python y Scala que contienen todo el código presentado aquí están disponibles en el repositorio GitHub del libro. Estos ejemplos te ofrecerán una muestra de cómo utilizar SQL en tus aplicaciones Spark a través de la interfaz programáticaspark.sql
. Similar a la API DataFrame en su sabor declarativo, esta interfaz te permite consultar datos estructurados en tus aplicaciones Spark .
Normalmente, en una aplicación Spark autónoma, crearás una instancia de SparkSession
manualmente, como se muestra en el siguiente ejemplo. Sin embargo, en un intérprete de comandos Spark (o en un cuaderno Databricks), SparkSession
se crea por ti y es accesible a través de la variable con el nombre apropiado spark
.
Empecemos por leer el conjunto de datos en una vista temporal:
// In Scala
import
org
.
apache
.
spark
.
sql
.
SparkSession
val
spark
=
SparkSession
.
builder
.
appName
(
"SparkSQLExampleApp"
)
.
getOrCreate
()
// Path to data set
val
csvFile
=
"/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
// Read and create a temporary view
// Infer schema (note that for larger files you may want to specify the schema)
val
df
=
spark
.
read
.
format
(
"csv"
)
.
option
(
"inferSchema"
,
"true"
)
.
option
(
"header"
,
"true"
)
.
load
(
csvFile
)
// Create a temporary view
df
.
createOrReplaceTempView
(
"us_delay_flights_tbl"
)
# In Python
from
pyspark.sql
import
SparkSession
# Create a SparkSession
spark
=
(
SparkSession
.
builder
.
appName
(
"SparkSQLExampleApp"
)
.
getOrCreate
())
# Path to data set
csv_file
=
"/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
# Read and create a temporary view
# Infer schema (note that for larger files you
# may want to specify the schema)
df
=
(
spark
.
read
.
format
(
"csv"
)
.
option
(
"inferSchema"
,
"true"
)
.
option
(
"header"
,
"true"
)
.
load
(
csv_file
))
df
.
createOrReplaceTempView
(
"us_delay_flights_tbl"
)
Nota
Si quieres especificar un esquema, puedes utilizar una cadena con formato DDL. Por ejemplo
// In Scala
val
schema
=
"date STRING, delay INT, distance INT,
origin STRING, destination STRING"
# In Python
schema
=
"`date` STRING, `delay` INT, `distance` INT,
`origin`
STRING
,
`destination`
STRING
"
Ahora que tenemos una vista temporal, podemos realizar consultas SQL utilizando Spark SQL. Estas consultas no difieren de las que podrías hacer a una tabla SQL en, por ejemplo, una base de datos MySQL o PostgreSQL. El objetivo aquí es mostrar que Spark SQL ofrece una interfaz SQL compatible con ANSI:2003, y demostrar la interoperabilidad entre SQL y DataFrames.
El conjunto de datos sobre retrasos de vuelos en EEUU tiene cinco columnas:
La columna
date
contiene una cadena como02190925
. Cuando se convierte, se asigna a02-19 09:25 am
.La columna
delay
indica el retraso en minutos entre la hora de salida programada y la real. Las salidas tempranas muestran números negativos.La columna
distance
da la distancia en millas desde el aeropuerto de origen al de destino.La columna
origin
contiene el código IATA del aeropuerto de origen.La columna
destination
contiene el código IATA del aeropuerto de destino.
Teniendo esto en cuenta, probemos algunas consultas de ejemplo con este conjunto de datos.
En primer lugar, buscaremos todos los vuelos cuya distancia sea superior a 1.000 millas:
spark.sql("""SELECT distance, origin, destination FROM us_delay_flights_tbl WHERE distance > 1000 ORDER BY distance DESC""").show(10) +--------+------+-----------+ |distance|origin|destination| +--------+------+-----------+ |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | +--------+------+-----------+ only showing top 10 rows
Como muestran los resultados, todos los vuelos más largos fueron entre Honolulú (HNL) y Nueva York (JFK). A continuación, encontraremos todos los vuelos entre San Francisco (SFO) y Chicago (ORD) con al menos dos horas de retraso:
spark.sql("""SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' ORDER by delay DESC""").show(10) +--------+-----+------+-----------+ |date |delay|origin|destination| +--------+-----+------+-----------+ |02190925|1638 |SFO |ORD | |01031755|396 |SFO |ORD | |01022330|326 |SFO |ORD | |01051205|320 |SFO |ORD | |01190925|297 |SFO |ORD | |02171115|296 |SFO |ORD | |01071040|279 |SFO |ORD | |01051550|274 |SFO |ORD | |03120730|266 |SFO |ORD | |01261104|258 |SFO |ORD | +--------+-----+------+-----------+ only showing top 10 rows
Parece que hubo muchos vuelos con retrasos importantes entre estas dos ciudades, en fechas diferentes. (Como ejercicio, convierte la columna date
a un formato legible y encuentra los días o meses en que estos retrasos fueron más frecuentes. ¿Estaban los retrasos relacionados con los meses de invierno o con las vacaciones)?
Intentemos una consulta más complicada en la que utilicemos la cláusula CASE
de SQL. En el siguiente ejemplo, queremos etiquetar todos los vuelos de EE.UU., independientemente de su origen y destino, con una indicación de los retrasos que sufrieron: Retrasos Muy Largos (> 6 horas), Retrasos Largos (2-6 horas), etc. Añadiremos estas etiquetas legibles por humanos en una nueva columna llamada Flight_Delays
:
spark.sql("""SELECT delay, origin, destination, CASE WHEN delay > 360 THEN 'Very Long Delays' WHEN delay >= 120 AND delay <= 360 THEN 'Long Delays' WHEN delay >= 60 AND delay < 120 THEN 'Short Delays' WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays' WHEN delay = 0 THEN 'No Delays' ELSE 'Early' END AS Flight_Delays FROM us_delay_flights_tbl ORDER BY origin, delay DESC""").show(10) +-----+------+-----------+-------------+ |delay|origin|destination|Flight_Delays| +-----+------+-----------+-------------+ |333 |ABE |ATL |Long Delays | |305 |ABE |ATL |Long Delays | |275 |ABE |ATL |Long Delays | |257 |ABE |ATL |Long Delays | |247 |ABE |DTW |Long Delays | |247 |ABE |ATL |Long Delays | |219 |ABE |ORD |Long Delays | |211 |ABE |ATL |Long Delays | |197 |ABE |DTW |Long Delays | |192 |ABE |ORD |Long Delays | +-----+------+-----------+-------------+ only showing top 10 rows
Al igual que con las APIs DataFrame y Dataset, con la interfaz spark.sql
puedes realizar operaciones comunes de análisis de datos como las que hemos explorado en el capítulo anterior. Los cálculos realizan un recorrido idéntico en el motor SQL de Spark (para más detalles, consulta "El optimizador Catalyst" en el Capítulo 3 ), proporcionándote los mismos resultados.
Las tres consultas SQL anteriores pueden expresarse con una consulta equivalente de la API de DataFrame. Por ejemplo, la primera consulta puede expresarse en la API Python DataFrame como:
# In Python
from
pyspark.sql.functions
import
col
,
desc
(
df
.
select
(
"distance"
,
"origin"
,
"destination"
)
.
where
(
col
(
"distance"
)
>
1000
)
.
orderBy
(
desc
(
"distance"
)))
.
show
(
10
)
# Or
(
df
.
select
(
"distance"
,
"origin"
,
"destination"
)
.
where
(
"distance > 1000"
)
.
orderBy
(
"distance"
,
ascending
=
False
)
.
show
(
10
))
Esto produce los mismos resultados que la consulta SQL:
+--------+------+-----------+ |distance|origin|destination| +--------+------+-----------+ |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | |4330 |HNL |JFK | +--------+------+-----------+ only showing top 10 rows
Como ejercicio, intenta convertir las otras dos consultas SQL para que utilicen la API DataFrame.
Como muestran estos ejemplos, utilizar la interfaz SQL de Spark para consultar datos es similar a escribir una consulta SQL normal a una tabla de base de datos relacional. Aunque las consultas están en SQL, puedes sentir la similitud en legibilidad y semántica con las operaciones de la API DataFrame, que conociste en el Capítulo 3 y explorarás más a fondo en el próximo capítulo.
Para que puedas consultar datos estructurados como se muestra en los ejemplos anteriores, Spark gestiona todas las complejidades de la creación y gestión de vistas y tablas, tanto en memoria como en disco. Esto nos lleva a nuestro siguiente tema: cómo se crean y gestionan las tablas y las vistas .
Tablas y vistas SQL
Las tablas contienen datos. Asociados a cada tabla en Spark están sus metadatos pertinentes, que son información sobre la tabla y sus datos: el esquema, la descripción, el nombre de la tabla, el nombre de la base de datos, los nombres de las columnas, las particiones, la ubicación física donde residen los datos reales, etc. Todo esto se almacena en un metastore central.
En lugar de tener un metastore separado para las tablas Spark, Spark utiliza por defecto el metastore de Apache Hive, ubicado en /user/hive/warehouse, para persistir todos los metadatos sobre tus tablas. Sin embargo, puedes cambiar la ubicación por defecto estableciendo la variable de configuración de Spark spark.sql.warehouse.dir
en otra ubicación, que puede establecerse en un almacenamiento distribuido local o externo.
Tablas gestionadas frente a no gestionadas
Spark te permite crear dos tipos de tablas: gestionadas y no gestionadas. Para una tabla gestionada, Spark gestiona tanto los metadatos como los datos en el almacén de archivos. Éste puede ser un sistema de archivos local, HDFS o un almacén de objetos como Amazon S3 o Azure Blob. Para una tabla no gestionada, Spark sólo gestiona los metadatos, mientras que tú mismo gestionas los datos en una fuente de datos externa como Cassandra.
Con una tabla gestionada, como Spark lo gestiona todo, un comando SQL como DROP TABLE table_name
borra tanto los metadatos como los datos. Con una tabla no gestionada, el mismo comando borrará sólo los metadatos, no los datos reales. Veremos algunos ejemplos de cómo crear tablas gestionadas y no gestionadas en la siguiente sección.
Crear bases de datos y tablas SQL
Las tablas residen dentro de una base de datos. Por defecto, Spark crea tablas en la base de datos default
. Para crear tu propio nombre de base de datos, puedes emitir un comando SQL desde tu aplicación o cuaderno Spark. Utilizando el conjunto de datos de retrasos de vuelos en EE.UU., vamos a crear una tabla gestionada y otra no gestionada. Para empezar, crearemos una base de datos llamada learn_spark_db
y le diremos a Spark que queremos utilizar esa base de datos:
// In Scala/Python
spark
.
sql
(
"CREATE DATABASE learn_spark_db"
)
spark
.
sql
(
"USE learn_spark_db"
)
A partir de este punto, cualquier comando que emitamos en nuestra aplicación para crear tablas tendrá como resultado que las tablas se crearán en esta base de datos y residirán bajo el nombre de base de datos learn_spark_db
.
Crear una tabla gestionada
Para crear una tabla gestionada dentro de la base de datos learn_spark_db
, puedes realizar una consulta SQL como la siguiente:
// In Scala/Python
spark
.
sql
(
"CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT,
distance INT, origin STRING, destination STRING)"
)
Puedes hacer lo mismo utilizando la API DataFrame de esta forma:
# In Python
# Path to our US flight delays CSV file
csv_file
=
"/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
# Schema as defined in the preceding example
schema
=
"date STRING, delay INT, distance INT, origin STRING, destination STRING"
flights_df
=
spark
.
read
.
csv
(
csv_file
,
schema
=
schema
)
flights_df
.
write
.
saveAsTable
(
"managed_us_delay_flights_tbl"
)
Ambas sentencias crearán la tabla gestionada us_delay_flights_tbl
en la base de datos learn_spark_db
.
Crear una tabla no gestionada
En cambio, puedes crear tablas no gestionadas a partir de tus propias fuentes de datos -por ejemplo, archivos Parquet, CSV o JSON almacenados en un almacén de archivos accesible a tu aplicación Spark.
Para crear una tabla no gestionada a partir de una fuente de datos, como un archivo CSV, en SQL utiliza:
spark.sql("""CREATE TABLE us_delay_flights_tbl(date STRING, delay INT, distance INT, origin STRING, destination STRING) USING csv OPTIONS (PATH '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')""")
Y dentro de la API DataFrame utiliza
(flights_df .write .option("path", "/tmp/data/us_flights_delay") .saveAsTable("us_delay_flights_tbl"))
Nota
Para que puedas explorar estos ejemplos, hemos creado cuadernos de ejemplos de Python y Scala que puedes encontrar en el repositorio GitHub del libro.
Crear vistas
Además de crear tablas, Spark puede crear vistas sobre las tablas existentes. Las vistas pueden ser globales (visibles en todos los SparkSession
s de un cluster determinado) o de sesión (visibles sólo en un SparkSession
), y son temporales: desaparecen cuando termina tu aplicación Spark.
Lacreación de vistas tiene una sintaxis similar a la creación de tablas dentro de una base de datos. Una vez creada una vista, puedes consultarla como harías con una tabla. La diferencia entre una vista y una tabla es que las vistas no conservan realmente los datos; las tablas persisten después de que finalice tu aplicación Spark, pero las vistas desaparecen.
Puedes crear una vista a partir de una tabla existente utilizando SQL. Por ejemplo, si deseas trabajar sólo con el subconjunto del conjunto de datos de retrasos de vuelos en EE.UU. con los aeropuertos de origen de Nueva York (JFK) y San Francisco (SFO), las siguientes consultas crearán vistas temporales y temporales globales consistentes sólo en esa porción de la tabla:
-- In SQL
CREATE
OR
REPLACE
GLOBAL
TEMP
VIEW
us_origin_airport_SFO_global_tmp_view
AS
SELECT
date
,
delay
,
origin
,
destination
from
us_delay_flights_tbl
WHERE
origin
=
'SFO'
;
CREATE
OR
REPLACE
TEMP
VIEW
us_origin_airport_JFK_tmp_view
AS
SELECT
date
,
delay
,
origin
,
destination
from
us_delay_flights_tbl
WHERE
origin
=
'JFK'
Puedes conseguir lo mismo con la API del Marco de datos de la siguiente manera:
# In Python
df_sfo
=
spark
.
sql
(
"SELECT date, delay, origin, destination FROM
us_delay_flights_tbl
WHERE
origin
=
'SFO'
")
df_jfk
=
spark
.
sql
(
"SELECT date, delay, origin, destination FROM
us_delay_flights_tbl
WHERE
origin
=
'JFK'
")
# Create a temporary and global temporary view
df_sfo
.
createOrReplaceGlobalTempView
(
"us_origin_airport_SFO_global_tmp_view"
)
df_jfk
.
createOrReplaceTempView
(
"us_origin_airport_JFK_tmp_view"
)
Una vez creadas estas vistas, puedes realizar consultas sobre ellas igual que lo harías sobre una tabla. Ten en cuenta que cuando accedas a una vista temporal global debes utilizar el prefijo global_temp.<view_name>
porque Spark crea vistas temporales globales en una base de datos temporal global llamada global_temp
. Por ejemplo:
-- In SQL
SELECT
*
FROM
global_temp
.
us_origin_airport_SFO_global_tmp_view
En cambio, puedes acceder a la vista temporal normal sin el prefijo global_temp
:
-- In SQL
SELECT
*
FROM
us_origin_airport_JFK_tmp_view
// In Scala/Python
spark
.
read
.
table
(
"us_origin_airport_JFK_tmp_view"
)
// Or
spark
.
sql
(
"SELECT * FROM us_origin_airport_JFK_tmp_view"
)
También puedes soltar una vista igual que harías con una tabla:
-- In SQL
DROP
VIEW
IF
EXISTS
us_origin_airport_SFO_global_tmp_view
;
DROP
VIEW
IF
EXISTS
us_origin_airport_JFK_tmp_view
// In Scala/Python
spark
.
catalog
.
dropGlobalTempView
(
"us_origin_airport_SFO_global_tmp_view"
)
spark
.
catalog
.
dropTempView
(
"us_origin_airport_JFK_tmp_view"
)
Vistas temporales frente a vistas temporales globales
La diferencia entre vistas temporales y vistas temporales globales es sutil, pero puede ser una fuente de ligera confusión entre los desarrolladores que se inician en Spark. Una vista temporal está vinculada a un único SparkSession
dentro de una aplicación Spark. Por el contrario, una vista temporal global es visible a través de múltiples SparkSession
s dentro de una aplicación Spark. Sí, puedes crear varios SparkSession
s dentro de una única aplicación Spark; esto puede ser útil, por ejemplo, en casos en los que quieras acceder (y combinar) datos de dos SparkSession
s diferentes que no compartan las mismas configuraciones de metastore de Hive.
Ver los metadatos
Como se ha mencionado anteriormente, Spark gestiona los metadatos asociados a cada tabla gestionada o no gestionada. Esto se captura en el Catalog
una abstracción de alto nivel en Spark SQL para almacenar metadatos. La funcionalidad de Catalog
se amplió en Spark 2.x con nuevos métodos públicos que te permiten examinar los metadatos asociados a tus bases de datos, tablas y vistas. Spark 3.0 lo amplía para utilizar catalog
externo (del que hablamos brevemente en el Capítulo 12).
Por ejemplo, dentro de una aplicación Spark, después de crear la variable SparkSession
spark
, puedes acceder a todos los metadatos almacenados a través de métodos como éstos:
// In Scala/Python
spark
.
catalog
.
listDatabases
()
spark
.
catalog
.
listTables
()
spark
.
catalog
.
listColumns
(
"us_delay_flights_tbl"
)
Importa el cuaderno del repositorio GitHub del libro y pruébalo.
Almacenamiento en caché de tablas SQL
Aunque hablaremos de las estrategias de almacenamiento en caché de tablas en el próximo capítulo, merece la pena mencionar aquí que, al igual que los DataFrames, puedes almacenar en caché y eliminar de la caché tablas y vistas SQL. En Spark 3.0, además de otras opciones, puedes especificar una tabla como LAZY
, lo que significa que sólo debe almacenarse en caché cuando se utilice por primera vez, en lugar de inmediatamente:
-- In SQL
CACHE
[
LAZY
]
TABLE
<
table
-
name
>
UNCACHE
TABLE
<
table
-
name
>
Leer tablas en marcos de datos
A menudo, los ingenieros de datos construyen canalizaciones de datos como parte de sus procesos habituales de ingestión de datos y ETL. Rellenan bases de datos y tablas Spark SQL con datos depurados para que los consuman las aplicaciones posteriores.
Supongamos que tienes una base de datos existente, learn_spark_db
, y una tabla, us_delay_flights_tbl
, lista para usar. En lugar de leer de un archivo JSON externo, puedes utilizar simplemente SQL para consultar la tabla y asignar el resultado devuelto a un DataFrame:
// In Scala
val
usFlightsDF
=
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
)
val
usFlightsDF2
=
spark
.
table
(
"us_delay_flights_tbl"
)
# In Python
us_flights_df
=
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
)
us_flights_df2
=
spark
.
table
(
"us_delay_flights_tbl"
)
Ahora tienes un DataFrame depurado leído de una tabla SQL existente en Spark. También puedes leer datos en otros formatos utilizando las fuentes de datos incorporadas de Spark, lo que te da flexibilidad para interactuar con varios formatos de archivo comuness.
Fuentes de datos para marcos de datos y tablas SQL
Como se muestra en la Figura 4-1, Spark SQL proporciona una interfaz para diversas fuentes de datos. También proporciona un conjunto de métodos comunes para leer y escribir datos en y desde estas fuentes de datos utilizando la API de Fuentes de Datos.
En esta sección cubriremos algunas de las fuentes de datos incorporadas, los formatos de archivo disponibles y las formas de cargar y escribir datos, junto con opciones específicas relativas a estas fuentes de datos. Pero antes, veamos más de cerca dos construcciones de alto nivel de la API de fuentes de datos que dictan la forma en que interactúas con las distintas fuentes de datos: DataFrameReader
y DataFrameWriter
.
LectorDeDatos
DataFrameReader
es la construcción básica para leer datos de una fuente de datos en un Marco de datos. Tiene un formato definido y un patrón de uso recomendado:
DataFrameReader.format(args).option("key", "value").schema(args).load()
Este patrón de encadenar métodos es común en Spark, y fácil de leer. Lo vimos en el Capítulo 3 cuando exploramos los patrones habituales de análisis de datos.
Ten en cuenta que sólo puedes acceder a un DataFrameReader
a través de una instancia de SparkSession
. Es decir, no puedes crear una instancia de DataFrameReader
. Para obtener un manejador de instancia de él, utiliza:
SparkSession.read // or SparkSession.readStream
Mientras que read
devuelve un "handle" a DataFrameReader
para leer en un DataFrame desde una fuente de datos estática, readStream
devuelve una instancia para leer desde una fuente de streaming. (Trataremos el Streaming Estructurado más adelante en el libro).
Los argumentos de cada uno de los métodos públicos de DataFrameReader
toman distintos valores. La Tabla 4-1 los enumera, con un subconjunto de los argumentos admitidos .
Método | Argumentos | Descripción |
---|---|---|
format() |
"parquet" , "csv" , "txt" , "json" , "jdbc" , "orc" , "avro" , etc. |
Si no especificas este método, entonces el predeterminado es Parquet o el que se establezca en spark.sql.sources.default . |
option() |
("mode", {PERMISSIVE | FAILFAST | DROPMALFORMED } ) ("inferSchema", {true | false}) ("path", "path_file_data_source") |
Una serie de pares clave/valor y opciones. La documentación de Spark muestra algunos ejemplos y explica los distintos modos y sus acciones. El modo por defecto es PERMISSIVE . Las opciones "inferSchema" y "mode" son específicas de los formatos de archivo JSON y CSV. |
schema() |
DDL String o StructType , por ejemplo, 'A INT, B STRING' oStructType(...) |
Para los formatos JSON o CSV, puedes especificar que se infiera el esquema en el método option() . En general, proporcionar un esquema para cualquier formato hace que la carga sea más rápida y garantiza que tus datos se ajusten al esquema esperado. |
load() |
"/path/to/data/source" |
La ruta a la fuente de datos. Puede estar vacía si se especifica en option("path", "...") . |
Aunque no vamos a enumerar exhaustivamente todas las diferentes combinaciones de argumentos y opciones, la documentación de Python, Scala, R y Java ofrece sugerencias y orientación. No obstante, merece la pena mostrar un par de ejemplos:
// In Scala
// Use Parquet
val
file
=
"""/databricks-datasets/learning-spark-v2/flights/summary-
data/parquet/2010-summary.parquet"""
val
df
=
spark
.
read
.
format
(
"parquet"
).
load
(
file
)
// Use Parquet; you can omit format("parquet") if you wish as it's the default
val
df2
=
spark
.
read
.
load
(
file
)
// Use CSV
val
df3
=
spark
.
read
.
format
(
"csv"
)
.
option
(
"inferSchema"
,
"true"
)
.
option
(
"header"
,
"true"
)
.
option
(
"mode"
,
"PERMISSIVE"
)
.
load
(
"/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
)
// Use JSON
val
df4
=
spark
.
read
.
format
(
"json"
)
.
load
(
"/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
)
Nota
En general, no se necesita ningún esquema cuando se lee de una fuente de datos Parquet estática: los metadatos Parquet suelen contener el esquema, por lo que se infiere. Sin embargo, para las fuentes de datos en streaming tendrás que proporcionar un esquema. (Trataremos la lectura de fuentes de datos en streaming en el Capítulo 8).
Parquet es la fuente de datos predeterminada y preferida de Spark porque es eficiente, utiliza almacenamiento columnar y emplea un rápido algoritmo de compresión. Verás ventajas adicionales más adelante (como el pushdown columnar), cuando tratemos el optimizador Catalyst en mayor profundidad.
EscritorDeDatos
DataFrameWriter
hace lo contrario que su homólogo: guarda o escribe datos en una fuente de datos incorporada especificada. A diferencia de DataFrameReader
, no accedes a su instancia desde SparkSession
, sino desde el DataFrame que deseas guardar. Tiene algunos patrones de uso recomendados:
DataFrameWriter.format(args) .option(args) .bucketBy(args) .partitionBy(args) .save(path) DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table)
Para obtener un manejador de instancia, utiliza
DataFrame.write // or DataFrame.writeStream
Los argumentos de cada uno de los métodos a DataFrameWriter
también toman valores diferentes. Los enumeramos en la Tabla 4-2, con un subconjunto de los argumentos admitidos .
Método | Argumentos | Descripción |
---|---|---|
format() |
"parquet" , "csv" , "txt" , "json" , "jdbc" , "orc" , "avro" , etc. |
Si no especificas este método, entonces el predeterminado es Parquet o lo que se establezca en spark.sql.sources.default . |
option() |
("mode", {append | overwrite | ignore | error or errorifexists} ) ("mode", {SaveMode.Overwrite | SaveMode.Append, SaveMode.Ignore, SaveMode.ErrorIfExists}) ("path", "path_to_write_to") |
Una serie de pares clave/valor y opciones. La documentación de Spark muestra algunos ejemplos. Se trata de un método sobrecargado. Las opciones de modo por defecto son error or errorifexists y SaveMode.ErrorIfExists ; lanzan una excepción en tiempo de ejecución si los datos ya existen. |
bucketBy() |
(numBuckets, col, col..., coln) |
El número de cubos y los nombres de las columnas por los que hacer los cubos. Utiliza el esquema de bucketing de Hive en un sistema de archivos. |
save()
|
"/path/to/data/source" |
La ruta en la que guardar. Puede estar vacía si se especifica en option("path", "...") . |
saveAsTable() |
"table_name" |
La tabla en la que guardar. |
He aquí un breve fragmento de ejemplo para ilustrar el uso de métodos y argumentos:
// In Scala
// Use JSON
val
location
=
...
df
.
write
.
format
(
"json"
).
mode
(
"overwrite"
).
save
(
location
)
Parquet
Comenzaremos nuestra exploración de las fuentes de datos con Parquet, porque es la fuente de datos por defecto en Spark. Apoyado y ampliamente utilizado por muchos marcos y plataformas de procesamiento de big data, Parquet es un formato de archivo columnar de código abierto que ofrece muchas optimizaciones de E/S (como la compresión, que ahorra espacio de almacenamiento y permite un acceso rápido a las columnas de datos).
Debido a su eficacia y a estas optimizaciones, te recomendamos que después de haber transformado y limpiado tus datos, guardes tus DataFrames en formato Parquet para su consumo posterior. (Parquet es también el formato de apertura de tablas por defecto de Delta Lake, del que hablaremos en el Capítulo 9).
Lectura de archivos Parquet en un DataFrame
Los archivos Parquet se almacenan en una estructura de directorios que contiene los archivos de datos, los metadatos, una serie de archivos comprimidos y algunos archivos de estado. Los metadatos del pie de página contienen la versión del formato de archivo, el esquema y datos de columna como la ruta, etc.
Por ejemplo, un directorio de un archivo Parquet puede contener un conjunto de archivos como éste:
_SUCCESS _committed_1799640464332036264 _started_1799640464332036264 part-00000-tid-1799640464332036264-91273258-d7ef-4dc7-<...>-c000.snappy.parquet
Puede haber varios archivos comprimidos en parte-XXXX en un directorio (los nombres que se muestran aquí se han acortado para que quepan en la página).
Para leer archivos Parquet en un Marco de datos, sólo tienes que especificar el formato y la ruta:
// In Scala
val
file
=
"""/databricks-datasets/learning-spark-v2/flights/summary-data/
parquet/2010-summary.parquet/"""
val
df
=
spark
.
read
.
format
(
"parquet"
).
load
(
file
)
# In Python
file
=
"""/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/
2010-summary.parquet/"""
df
=
spark
.
read
.
format
(
"parquet"
)
.
load
(
file
)
A menos que estés leyendo de una fuente de datos en streaming, no es necesario que proporciones el esquema, porque Parquet lo guarda como parte de sus metadatos.
Lectura de archivos Parquet en una tabla SQL de Spark
Además de leer archivos Parquet en un Spark DataFrame, también puedes crear una tabla o vista no gestionada Spark SQL directamente utilizando SQL:
-- In SQL
CREATE
OR
REPLACE
TEMPORARY
VIEW
us_delay_flights_tbl
USING
parquet
OPTIONS
(
path
"/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/
2010-summary.parquet/"
)
Una vez que hayas creado la tabla o vista, puedes leer datos en un Marco de datos utilizando SQL, como vimos en algunos ejemplos anteriores:
// In Scala
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
).
show
()
# In Python
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
)
.
show
()
Ambas operaciones devuelven los mismos resultados:
+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |United States |Romania |1 | |United States |Ireland |264 | |United States |India |69 | |Egypt |United States |24 | |Equatorial Guinea|United States |1 | |United States |Singapore |25 | |United States |Grenada |54 | |Costa Rica |United States |477 | |Senegal |United States |29 | |United States |Marshall Islands |44 | +-----------------+-------------------+-----+ only showing top 10 rows
Escribir DataFrames en archivos Parquet
Escribir o guardar un DataFrame como tabla o archivo es una operación habitual en Spark. Para escribir un DataFrame sólo tienes que utilizar los métodos y argumentos de DataFrameWriter
descritos anteriormente en este capítulo, proporcionando la ubicación en la que guardar los archivos Parquet. Por ejemplo
// In Scala
df
.
write
.
format
(
"parquet"
)
.
mode
(
"overwrite"
)
.
option
(
"compression"
,
"snappy"
)
.
save
(
"/tmp/data/parquet/df_parquet"
)
# In Python
(
df
.
write
.
format
(
"parquet"
)
.
mode
(
"overwrite"
)
.
option
(
"compression"
,
"snappy"
)
.
save
(
"/tmp/data/parquet/df_parquet"
))
Nota
Recuerda que Parquet es el formato de archivo por defecto. Si no incluyes el método format()
, el DataFrame se seguirá guardando como un archivo Parquet.
Esto creará un conjunto de archivos Parquet compactos y comprimidos en la ruta especificada. Como aquí hemos utilizado snappy como opción de compresión, tendremos archivos comprimidos snappy. Por brevedad, este ejemplo sólo ha generado un archivo; normalmente, puede haber una docena de archivos creados:
-rw-r--r-- 1 jules wheel 0 May 19 10:58 _SUCCESS -rw-r--r-- 1 jules wheel 966 May 19 10:58 part-00000-<...>-c000.snappy.parquet
Escribir DataFrames en tablas SQL de Spark
Escribir un DataFrame en una tabla SQL es tan fácil como escribir en un archivo: sólo tienes que utilizar saveAsTable()
en lugar de save()
. Esto creará una tabla gestionada llamada us_delay_flights_tbl
:
// In Scala
df
.
write
.
mode
(
"overwrite"
)
.
saveAsTable
(
"us_delay_flights_tbl"
)
# In Python
(
df
.
write
.
mode
(
"overwrite"
)
.
saveAsTable
(
"us_delay_flights_tbl"
))
En resumen, Parquet es el formato de archivo de origen de datos incorporado preferido y por defecto en Spark, y ha sido adoptado por muchos otros frameworks. Te recomendamos que utilices este formato en tus procesos ETL y de ingestión de datos.
JSON
La Notación de Objetos JavaScript (JSON) también es un formato de datos popular. Saltó a la fama por ser un formato fácil de leer y de descifrar en comparación con XML. Tiene dos formatos de representación el modo de una sola línea y el modo de varias líneas. Spark admite ambos modos.
En el modo de una sola línea , cada línea denota un único objeto JSON, mientras que en el modo multilínea, todo el objeto multilínea constituye un único objeto JSON. Para leer en este modo, establece multiLine
a true en el método option()
.
Leer un archivo JSON en un DataFrame
Puedes leer un archivo JSON en un DataFrame del mismo modo que lo hacías con Parquet: sólo tienes que especificar "json"
en el método format()
:
// In Scala
val
file
=
"/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
val
df
=
spark
.
read
.
format
(
"json"
).
load
(
file
)
# In Python
file
=
"/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
df
=
spark
.
read
.
format
(
"json"
)
.
load
(
file
)
Lectura de un archivo JSON en una tabla SQL de Spark
También puedes crear una tabla SQL a partir de un archivo JSON, igual que hiciste con Parquet:
-- In SQL
CREATE
OR
REPLACE
TEMPORARY
VIEW
us_delay_flights_tbl
USING
json
OPTIONS
(
path
"/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
)
Una vez creada la tabla, puedes leer los datos en un Marco de datos utilizando SQL:
//
In
Scala
/
Python
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
)
.
show
()
+-----------------+-------------------+-----+
|
DEST_COUNTRY_NAME
|
ORIGIN_COUNTRY_NAME
|
count
|
+-----------------+-------------------+-----+
|
United
States
|
Romania
|
15
|
|
United
States
|
Croatia
|
1
|
|
United
States
|
Ireland
|
344
|
|
Egypt
|
United
States
|
15
|
|
United
States
|
India
|
62
|
|
United
States
|
Singapore
|
1
|
|
United
States
|
Grenada
|
62
|
|
Costa
Rica
|
United
States
|
588
|
|
Senegal
|
United
States
|
40
|
|
Moldova
|
United
States
|
1
|
+-----------------+-------------------+-----+
only
showing
top
10
rows
Escribir DataFrames en archivos JSON
Guardar un DataFrame como archivo JSON es sencillo. Especifica los métodos y argumentos DataFrameWriter
y proporciona la ubicación en la que guardar los archivos JSON:
// In Scala
df
.
write
.
format
(
"json"
)
.
mode
(
"overwrite"
)
.
option
(
"compression"
,
"snappy"
)
.
save
(
"/tmp/data/json/df_json"
)
# In Python
(
df
.
write
.
format
(
"json"
)
.
mode
(
"overwrite"
)
.
option
(
"compression"
,
"snappy"
)
.
save
(
"/tmp/data/json/df_json"
))
Crea un directorio en la ruta especificada con un conjunto de archivos JSON compactos:
-rw-r--r-- 1 jules wheel 0 May 16 14:44 _SUCCESS -rw-r--r-- 1 jules wheel 71 May 16 14:44 part-00000-<...>-c000.json
Opciones de la fuente de datos JSON
La Tabla 4-3 describe las opciones JSON habituales para DataFrameReader
y DataFrameWriter
. Para obtener una lista completa, te remitimos a la documentación .
Nombre de la propiedad | Valores | Significado | Alcance |
---|---|---|---|
compression |
none , uncompressed , bzip2 , deflate , gzip , lz4 , o snappy |
Utiliza este códec de compresión para escribir. Ten en cuenta que la lectura sólo detectará la compresión o el códec a partir de la extensión del archivo. | Escribe |
dateFormat |
yyyy-MM-dd o DateTimeFormatter |
Utiliza este formato o cualquier formato de la página de Java DateTimeFormatter . |
Lectura/escritura |
multiLine |
true , false |
Utiliza el modo multilínea. Por defecto es false (modo de una sola línea). |
Leer |
allowUnquotedFieldNames |
true , false |
Permitir nombres de campo JSON sin entrecomillar. Por defecto es false . |
Leer |
CSV
Tan utilizado como los archivos de texto sin formato, este formato de archivo de texto común captura cada dato o campo delimitado por una coma; cada línea con campos separados por comas representa un registro. Aunque la coma es el separador por defecto, puedes utilizar otros delimitadores para separar campos en los casos en que las comas formen parte de tus datos. Las hojas de cálculo más populares pueden generar archivos CSV, por lo que es un formato popular entre los analistas de datos y de negocio.
Leer un archivo CSV en un DataFrame
Al igual que con las otras fuentes de datos incorporadas, puedes utilizar los métodos y argumentos de DataFrameReader
para leer un archivo CSV en un DataFrame:
// In Scala
val
file
=
"/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
val
schema
=
"DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"
val
df
=
spark
.
read
.
format
(
"csv"
)
.
schema
(
schema
)
.
option
(
"header"
,
"true"
)
.
option
(
"mode"
,
"FAILFAST"
)
// Exit if any errors
.
option
(
"nullValue"
,
""
)
// Replace any null data with quotes
.
load
(
file
)
# In Python
file
=
"/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
schema
=
"DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"
df
=
(
spark
.
read
.
format
(
"csv"
)
.
option
(
"header"
,
"true"
)
.
schema
(
schema
)
.
option
(
"mode"
,
"FAILFAST"
)
# Exit if any errors
.
option
(
"nullValue"
,
""
)
# Replace any null data field with quotes
.
load
(
file
))
Lectura de un archivo CSV en una tabla SQL de Spark
Crear una tabla SQL a partir de una fuente de datos CSV no difiere de utilizar Parquet o JSON:
-- In SQL
CREATE
OR
REPLACE
TEMPORARY
VIEW
us_delay_flights_tbl
USING
csv
OPTIONS
(
path
"/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
,
header
"true"
,
inferSchema
"true"
,
mode
"FAILFAST"
)
Una vez creada la tabla, puedes leer los datos en un Marco de datos utilizando SQL como antes:
//
In
Scala
/
Python
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
)
.
show
(
10
)
+-----------------+-------------------+-----+
|
DEST_COUNTRY_NAME
|
ORIGIN_COUNTRY_NAME
|
count
|
+-----------------+-------------------+-----+
|
United
States
|
Romania
|
1
|
|
United
States
|
Ireland
|
264
|
|
United
States
|
India
|
69
|
|
Egypt
|
United
States
|
24
|
|
Equatorial
Guinea
|
United
States
|
1
|
|
United
States
|
Singapore
|
25
|
|
United
States
|
Grenada
|
54
|
|
Costa
Rica
|
United
States
|
477
|
|
Senegal
|
United
States
|
29
|
|
United
States
|
Marshall
Islands
|
44
|
+-----------------+-------------------+-----+
only
showing
top
10
rows
Escribir DataFrames en archivos CSV
Guardar un DataFrame como archivo CSV es sencillo. Especifica los métodos y argumentos apropiados de DataFrameWriter
, y proporciona la ubicación en la que guardar los archivos CSV:
// In Scala
df
.
write
.
format
(
"csv"
).
mode
(
"overwrite"
).
save
(
"/tmp/data/csv/df_csv"
)
# In Python
df
.
write
.
format
(
"csv"
)
.
mode
(
"overwrite"
)
.
save
(
"/tmp/data/csv/df_csv"
)
Esto genera una carpeta en la ubicación especificada, poblada con un montón de archivos comprimidos y compactos:
-rw-r--r-- 1 jules wheel 0 May 16 12:17 _SUCCESS -rw-r--r-- 1 jules wheel 36 May 16 12:17 part-00000-251690eb-<...>-c000.csv
Opciones de fuente de datos CSV
La Tabla 4-4 describe algunas de las opciones comunes de CSV para DataFrameReader
y DataFrameWriter
. Como los archivos CSV pueden ser complejos, hay muchas opciones disponibles; para obtener una lista completa te remitimos a la documentación .
Nombre de la propiedad | Valores | Significado | Alcance |
---|---|---|---|
compression |
none , bzip2 , deflate , gzip , lz4 , o snappy |
Utiliza este códec de compresión para escribir. | Escribe |
dateFormat |
yyyy-MM-dd o DateTimeFormatter |
Utiliza este formato o cualquier formato de la página de Java DateTimeFormatter . |
Lectura/escritura |
multiLine |
true , false |
Utiliza el modo multilínea. Por defecto es false (modo de una sola línea). |
Leer |
inferSchema |
true , false |
Si true , Spark determinará los tipos de datos de las columnas. Por defecto es false . |
Leer |
sep |
Cualquier personaje | Utiliza este carácter para separar los valores de las columnas de una fila. El delimitador por defecto es una coma (, ). |
Lectura/escritura |
escape |
Cualquier personaje | Utiliza este carácter para escapar de las comillas. Por defecto es \ . |
Lectura/escritura |
header |
true , false |
Indica si la primera línea es una cabecera que indica el nombre de cada columna. Por defecto es false . |
Lectura/escritura |
Avro
Introducido en Spark 2.4 como fuente de datos incorporada, el formato Avro es utilizado, por ejemplo, por Apache Kafka para serializar y deserializar mensajes. Ofrece muchas ventajas, como el mapeo directo a JSON, velocidad y eficiencia, y enlaces disponibles para muchos lenguajes de programación.
Leer un archivo Avro en un DataFrame
La lectura de un archivo Avro en un DataFrame utilizando DataFrameReader
es coherente en su uso con las otras fuentes de datos que hemos tratado en esta sección:
// In Scala
val
df
=
spark
.
read
.
format
(
"avro"
)
.
load
(
"/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
)
df
.
show
(
false
)
# In Python
df
=
(
spark
.
read
.
format
(
"avro"
)
.
load
(
"/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
))
df
.
show
(
truncate
=
False
)
+-----------------+-------------------+-----+
|
DEST_COUNTRY_NAME
|
ORIGIN_COUNTRY_NAME
|
count
|
+-----------------+-------------------+-----+
|
United
States
|
Romania
|
1
|
|
United
States
|
Ireland
|
264
|
|
United
States
|
India
|
69
|
|
Egypt
|
United
States
|
24
|
|
Equatorial
Guinea
|
United
States
|
1
|
|
United
States
|
Singapore
|
25
|
|
United
States
|
Grenada
|
54
|
|
Costa
Rica
|
United
States
|
477
|
|
Senegal
|
United
States
|
29
|
|
United
States
|
Marshall
Islands
|
44
|
+-----------------+-------------------+-----+
only
showing
top
10
rows
Lectura de un archivo Avro en una tabla SQL de Spark
De nuevo, crear tablas SQL utilizando una fuente de datos Avro no difiere de utilizar Parquet, JSON o CSV:
-- In SQL
CREATE
OR
REPLACE
TEMPORARY
VIEW
episode_tbl
USING
avro
OPTIONS
(
path
"/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
)
Una vez que hayas creado una tabla, puedes leer datos en un Marco de datos utilizando SQL:
// In Scala
spark
.
sql
(
"SELECT * FROM episode_tbl"
).
show
(
false
)
# In Python
spark
.
sql
(
"SELECT * FROM episode_tbl"
)
.
show
(
truncate
=
False
)
+-----------------+-------------------+-----+
|
DEST_COUNTRY_NAME
|
ORIGIN_COUNTRY_NAME
|
count
|
+-----------------+-------------------+-----+
|
United
States
|
Romania
|
1
|
|
United
States
|
Ireland
|
264
|
|
United
States
|
India
|
69
|
|
Egypt
|
United
States
|
24
|
|
Equatorial
Guinea
|
United
States
|
1
|
|
United
States
|
Singapore
|
25
|
|
United
States
|
Grenada
|
54
|
|
Costa
Rica
|
United
States
|
477
|
|
Senegal
|
United
States
|
29
|
|
United
States
|
Marshall
Islands
|
44
|
+-----------------+-------------------+-----+
only
showing
top
10
rows
Escribir DataFrames en archivos Avro
Escribir un Marco de datos como archivo Avro es sencillo. Como de costumbre, especifica los métodos y argumentos apropiados de DataFrameWriter
, y proporciona la ubicación en la que guardar los archivos Avro:
// In Scala
df
.
write
.
format
(
"avro"
)
.
mode
(
"overwrite"
)
.
save
(
"/tmp/data/avro/df_avro"
)
# In Python
(
df
.
write
.
format
(
"avro"
)
.
mode
(
"overwrite"
)
.
save
(
"/tmp/data/avro/df_avro"
))
Esto genera una carpeta en la ubicación especificada, poblada con un montón de archivos comprimidos y compactos:
-rw-r--r-- 1 jules wheel 0 May 17 11:54 _SUCCESS -rw-r--r-- 1 jules wheel 526 May 17 11:54 part-00000-ffdf70f4-<...>-c000.avro
Opciones de fuente de datos Avro
En la Tabla 4-5 se describen las opciones habituales de DataFrameReader
y DataFrameWriter
. Encontrarás una lista completa de opciones en la documentación .
Nombre de la propiedad | Valor por defecto | Significado | Alcance |
---|---|---|---|
avroSchema |
Ninguno | Esquema Avro opcional proporcionado por un usuario en formato JSON. El tipo de datos y la denominación de los campos del registro deben coincidir con los datos Avro de entrada o con los datos Catalyst (tipo de datos interno de Spark), de lo contrario la acción de lectura/escritura fallará. | Lectura/escritura |
recordName |
topLevelRecord |
Nombre del registro de nivel superior en el resultado de la escritura, que se requiere en la especificación Avro. | Escribe |
recordNamespace |
"" |
Registra el espacio de nombres en el resultado de la escritura. | Escribe |
ignoreExtension |
true |
Si esta opción está activada, se cargan todos los archivos (con y sin la extensión .avro ). De lo contrario, se ignoran los archivos sin la extensión .avro. | Leer |
compression |
snappy |
Te permite especificar el códec de compresión a utilizar en la escritura. Los códecs admitidos actualmente son uncompressed , snappy , deflate , bzip2 y xz .Si no se establece esta opción, se tiene en cuenta el valor de spark.sql.avro.compression.codec . |
Escribe |
ORC
Como formato de archivo columnar optimizado adicional, Spark 2.x admite un lector ORC vectorizado. Dos configuraciones de Spark dictan qué implementación de ORC utilizar. Cuando spark.sql.orc.impl
está configurado como native
y spark.sql.orc.enableVectorizedReader
como true
, Spark utiliza el lector ORC vectorizado. Un lector vectorizado lee bloques de filas (a menudo 1.024 por bloque) en lugar de una fila cada vez, lo que agiliza las operaciones y reduce el uso de CPU en operaciones intensivas como escaneos, filtros, agregaciones y uniones.
Para las tablas Hive ORC SerDe (serialización y deserialización) creadas con el comando SQL USING HIVE OPTIONS (fileFormat 'ORC')
, se utiliza el lector vectorizado cuando el parámetro de configuración de Spark spark.sql.hive.convertMetastoreOrc
se establece en true
.
Leer un archivo ORC en un Marco de datos
Para leer en un DataFrame utilizando el lector vectorizado ORC, sólo tienes que utilizar los métodos y opciones normales de DataFrameReader
:
// In Scala
val
file
=
"/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
val
df
=
spark
.
read
.
format
(
"orc"
).
load
(
file
)
df
.
show
(
10
,
false
)
# In Python
file
=
"/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
df
=
spark
.
read
.
format
(
"orc"
)
.
option
(
"path"
,
file
)
.
load
()
df
.
show
(
10
,
False
)
+-----------------+-------------------+-----+
|
DEST_COUNTRY_NAME
|
ORIGIN_COUNTRY_NAME
|
count
|
+-----------------+-------------------+-----+
|
United
States
|
Romania
|
1
|
|
United
States
|
Ireland
|
264
|
|
United
States
|
India
|
69
|
|
Egypt
|
United
States
|
24
|
|
Equatorial
Guinea
|
United
States
|
1
|
|
United
States
|
Singapore
|
25
|
|
United
States
|
Grenada
|
54
|
|
Costa
Rica
|
United
States
|
477
|
|
Senegal
|
United
States
|
29
|
|
United
States
|
Marshall
Islands
|
44
|
+-----------------+-------------------+-----+
only
showing
top
10
rows
Lectura de un archivo ORC en una tabla SQL de Spark
No hay ninguna diferencia con Parquet, JSON, CSV o Avro a la hora de crear una vista SQL utilizando una fuente de datos ORC:
-- In SQL
CREATE
OR
REPLACE
TEMPORARY
VIEW
us_delay_flights_tbl
USING
orc
OPTIONS
(
path
"/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
)
Una vez creada una tabla, puedes leer datos en un Marco de datos utilizando SQL como de costumbre:
//
In
Scala
/
Python
spark
.
sql
(
"SELECT * FROM us_delay_flights_tbl"
)
.
show
()
+-----------------+-------------------+-----+
|
DEST_COUNTRY_NAME
|
ORIGIN_COUNTRY_NAME
|
count
|
+-----------------+-------------------+-----+
|
United
States
|
Romania
|
1
|
|
United
States
|
Ireland
|
264
|
|
United
States
|
India
|
69
|
|
Egypt
|
United
States
|
24
|
|
Equatorial
Guinea
|
United
States
|
1
|
|
United
States
|
Singapore
|
25
|
|
United
States
|
Grenada
|
54
|
|
Costa
Rica
|
United
States
|
477
|
|
Senegal
|
United
States
|
29
|
|
United
States
|
Marshall
Islands
|
44
|
+-----------------+-------------------+-----+
only
showing
top
10
rows
Escribir DataFrames en archivos ORC
Volver a escribir un DataFrame transformado después de leerlo es igualmente sencillo utilizando los métodos DataFrameWriter
métodos:
// In Scala
df
.
write
.
format
(
"orc"
)
.
mode
(
"overwrite"
)
.
option
(
"compression"
,
"snappy"
)
.
save
(
"/tmp/data/orc/df_orc"
)
# In Python
(
df
.
write
.
format
(
"orc"
)
.
mode
(
"overwrite"
)
.
option
(
"compression"
,
"snappy"
)
.
save
(
"/tmp/data/orc/flights_orc"
))
El resultado será una carpeta en la ubicación especificada que contendrá algunos archivos ORC comprimidos:
-rw-r--r-- 1 jules wheel 0 May 16 17:23 _SUCCESS -rw-r--r-- 1 jules wheel 547 May 16 17:23 part-00000-<...>-c000.snappy.orc
Imágenes
En Spark 2.4, la comunidad introdujo una nueva fuente de datos, los archivos de imagen, para dar soporte a marcos de aprendizaje profundo y aprendizaje automático como TensorFlow y PyTorch. Para las aplicaciones de aprendizaje automático basadas en la visión por ordenador, es importante cargar y procesar conjuntos de datos de imágenes.
Leer un archivo de imagen en un DataFrame
Como con todos los formatos de archivo anteriores, puedes utilizar los métodos y opciones de DataFrameReader
para leer un archivo de imagen, como se muestra aquí:
// In Scala
import
org
.
apache
.
spark
.
ml
.
source
.
image
val
imageDir
=
"/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
val
imagesDF
=
spark
.
read
.
format
(
"image"
).
load
(
imageDir
)
imagesDF
.
printSchema
imagesDF
.
select
(
"image.height"
,
"image.width"
,
"image.nChannels"
,
"image.mode"
,
"label"
).
show
(
5
,
false
)
# In Python
from
pyspark.ml
import
image
image_dir
=
"/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
images_df
=
spark
.
read
.
format
(
"image"
)
.
load
(
image_dir
)
images_df
.
printSchema
()
root
|--
image
:
struct
(
nullable
=
true
)
|
|--
origin
:
string
(
nullable
=
true
)
|
|--
height
:
integer
(
nullable
=
true
)
|
|--
width
:
integer
(
nullable
=
true
)
|
|--
nChannels
:
integer
(
nullable
=
true
)
|
|--
mode
:
integer
(
nullable
=
true
)
|
|--
data
:
binary
(
nullable
=
true
)
|--
label
:
integer
(
nullable
=
true
)
images_df
.
select
(
"image.height"
,
"image.width"
,
"image.nChannels"
,
"image.mode"
,
"label"
)
.
show
(
5
,
truncate
=
False
)
+------+-----+---------+----+-----+
|
height
|
width
|
nChannels
|
mode
|
label
|
+------+-----+---------+----+-----+
|
288
|
384
|
3
|
16
|
0
|
|
288
|
384
|
3
|
16
|
1
|
|
288
|
384
|
3
|
16
|
0
|
|
288
|
384
|
3
|
16
|
0
|
|
288
|
384
|
3
|
16
|
0
|
+------+-----+---------+----+-----+
only
showing
top
5
rows
Archivos binarios
Spark 3.0 añade soporte para archivos binarios como fuente de datos. La página DataFrameReader
convierte cada archivo binario en una única fila (registro) DataFrame que contiene el contenido bruto y los metadatos del archivo. La fuente de datos de archivos binarios produce un DataFrame con las siguientes columnas:
camino: StringType
modificationTime: TimestampType
longitud: TipoLargo
contenido: TipoBinario
Leer un archivo binario en un DataFrame
Para leer archivos binarios, especifica el formato de la fuente de datos como binaryFile
. Puedes cargar archivos con rutas que coincidan con un patrón global dado, conservando el comportamiento del descubrimiento de particiones con la opción de fuente de datos pathGlobFilter
. Por ejemplo, el siguiente código lee todos los archivos JPG del directorio de entrada con cualquier directorio particionado:
// In Scala
val
path
=
"/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
val
binaryFilesDF
=
spark
.
read
.
format
(
"binaryFile"
)
.
option
(
"pathGlobFilter"
,
"*.jpg"
)
.
load
(
path
)
binaryFilesDF
.
show
(
5
)
# In Python
path
=
"/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
binary_files_df
=
(
spark
.
read
.
format
(
"binaryFile"
)
.
option
(
"pathGlobFilter"
,
"*.jpg"
)
.
load
(
path
))
binary_files_df
.
show
(
5
)
+--------------------+-------------------+------+--------------------+-----+
|
path
|
modificationTime
|
length
|
content
|
label
|
+--------------------+-------------------+------+--------------------+-----+
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
55037
|
[
FF
D8
FF
E0
00
1.
..|
0
|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54634
|
[
FF
D8
FF
E0
00
1.
..|
1
|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54624
|
[
FF
D8
FF
E0
00
1.
..|
0
|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54505
|
[
FF
D8
FF
E0
00
1.
..|
0
|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54475
|
[
FF
D8
FF
E0
00
1.
..|
0
|
+--------------------+-------------------+------+--------------------+-----+
only
showing
top
5
rows
Para ignorar el descubrimiento de datos de partición en un directorio, puedes configurar recursiveFileLookup
en "true"
:
// In Scala
val
binaryFilesDF
=
spark
.
read
.
format
(
"binaryFile"
)
.
option
(
"pathGlobFilter"
,
"*.jpg"
)
.
option
(
"recursiveFileLookup"
,
"true"
)
.
load
(
path
)
binaryFilesDF
.
show
(
5
)
# In Python
binary_files_df
=
(
spark
.
read
.
format
(
"binaryFile"
)
.
option
(
"pathGlobFilter"
,
"*.jpg"
)
.
option
(
"recursiveFileLookup"
,
"true"
)
.
load
(
path
))
binary_files_df
.
show
(
5
)
+--------------------+-------------------+------+--------------------+
|
path
|
modificationTime
|
length
|
content
|
+--------------------+-------------------+------+--------------------+
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
55037
|
[
FF
D8
FF
E0
00
1.
..|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54634
|
[
FF
D8
FF
E0
00
1.
..|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54624
|
[
FF
D8
FF
E0
00
1.
..|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54505
|
[
FF
D8
FF
E0
00
1.
..|
|
file
:
/
Users
/
jules
...|
2020
-
02
-
12
12
:
04
:
24
|
54475
|
[
FF
D8
FF
E0
00
1.
..|
+--------------------+-------------------+------+--------------------+
only
showing
top
5
rows
Ten en cuenta que la columna label
está ausente cuando la opción recursiveFileLookup
está configurada como "true"
.
Actualmente, la fuente de datos de archivos binarios no permite volver a escribir un Marco de datos en el formato de archivo original.
En esta sección, has visto cómo leer datos en un Marco de datos desde una serie de formatos de archivo compatibles. También te mostramos cómo crear vistas y tablas temporales a partir de las fuentes de datos incorporadas existentes. Tanto si utilizas la API del DataFrame como SQL, las consultas producen resultados idénticos. Puedes examinar algunas de estas consultas en el cuaderno disponible en el repositorio de GitHub de este libro.
Resumen
Para recapitular, este capítulo ha explorado la interoperabilidad entre la API de DataFrame y Spark SQL. En concreto, te has hecho una idea de cómo utilizar Spark SQL para:
Crea tablas gestionadas y no gestionadas utilizando Spark SQL y la API DataFrame.
Lee y escribe en varias fuentes de datos y formatos de archivo incorporados.
Utiliza la interfaz programática
spark.sql
para emitir consultas SQL sobre datos estructurados almacenados como tablas o vistas SQL de Spark.Navega por Spark
Catalog
para inspeccionar los metadatos asociados a tablas y vistas.Utiliza las API
DataFrameWriter
yDataFrameReader
.
A través de los fragmentos de código del capítulo y de los cuadernos disponibles en el repositorio GitHub del libro, te has hecho una idea de cómo utilizar DataFrames y Spark SQL. Siguiendo en esta línea, el siguiente capítulo explora más a fondo cómo interactúa Spark con las fuentes de datos externas que se muestran en la Figura 4-1. Verás algunos ejemplos más detallados de transformaciones y de la interoperabilidad entre la API de DataFrame y Spark SQL .
Get Aprender Spark, 2ª Edición now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.