Capítulo 1. Introducción a Spark y PySpark
Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com
Spark es un potente motor de análisis para el procesamiento de datos a gran escala que tiene como objetivo la velocidad, la facilidad de uso y la extensibilidad para aplicaciones de big data. Es una tecnología probada y ampliamente adoptada, utilizada por muchas empresas que manejan big data a diario. Aunque el lenguaje "nativo" de Spark es Scala (la mayor parte de Spark se desarrolla en Scala), también proporciona API de alto nivel en Java, Python y R.
En este libro utilizaremos Python a través de PySpark, una API que expone el modelo de programación Spark a Python. Dado que Python es el lenguaje de programación más accesible y que la API de Spark es potente y expresiva, la sencillez de PySpark hace que sea la mejor opción para nosotros. PySpark es una interfaz para Spark en el lenguaje de programación Python que proporciona las dos características importantes siguientes:
-
Nos permite escribir aplicaciones Spark utilizando las API de Python.
-
Proporciona el shell PySpark para analizar datos de forma interactiva en unentorno distribuido.
El propósito de este capítulo es presentar PySpark como el principal componente del ecosistema Spark y mostrarte que puede utilizarse eficazmente para tareas de big data como operaciones ETL, indexación de miles de millones de documentos, ingesta de millones de genomas, aprendizaje automático, análisis de datos de grafos, análisis de datos de ADN y mucho más. Empezaré revisando las arquitecturas de Spark y PySpark, y daré ejemplos para mostrar la potencia expresiva de PySpark. Presentaré una visión general de las principales funciones (transformaciones y acciones) y conceptos de Spark para que estés capacitado para empezar a utilizar Spark y PySpark de inmediato. Las principales abstracciones de datos de Spark son los conjuntos de datos distribuidos resilientes (RDD), los DataFrames y los Datasets. Como verás, puedes representar tus datos (almacenados como archivos de Hadoop, objetos de Amazon S3, archivos de Linux, estructuras de datos de colecciones, tablas de bases de datos relacionales, etc.) en cualquier combinación de RDDs yDataFrames.
Una vez que tus datos están representados como una abstracción de datos Spark, puedes aplicar transformaciones sobre ellos y crear nuevas abstracciones de datos hasta que los datos tengan la forma final que buscas. Las transformaciones de Spark (como map()
y reduceByKey()
) pueden utilizarse para convertir tus datos de una forma a otra hasta que obtengas el resultado deseado. En breve explicaré estas abstracciones de datos, pero antes, profundicemos un poco más en por qué Spark es la mejor opción para el análisis de datos.
Por qué Spark para el análisis de datos
Spark es un potente motor de análisis que puede utilizarse para el procesamiento de datos a gran escala. Las razones más importantes para utilizar Spark son:
-
Spark es sencillo, potente y rápido.
-
Spark es gratuito y de código abierto.
-
Spark funciona en todas partes (Hadoop, Mesos, Kubernetes, independiente o en la nube).
-
Spark puede leer/escribir datos desde/a cualquier fuente de datos (Amazon S3, Hadoop HDFS, bases de datos relacionales, etc.).
-
Spark puede integrarse con casi cualquier aplicación de datos.
-
Spark puede leer/escribir datos en formatos basados en filas (como Avro) y en columnas (como Parquet y ORC).
-
Spark dispone de un rico pero sencillo conjunto de API para todo tipo de procesos ETL.
En los últimos cinco años, Spark ha progresado de tal manera que creo que puede utilizarse para resolver cualquier problema de big data. Esto se ve respaldado por el hecho de que todas las empresas de big data, como Facebook, Illumina, IBM y Google, utilizan Spark a diario en sistemas de producción.
Spark es una de las mejores opciones para el procesamiento de datos a gran escala y para resolver problemas deMapReducey más allá, ya que libera el poder de los datos al manejar big data con potentes API y velocidad. Utilizar MapReduce/Hadoop para resolver problemas de big data es complejo, y tienes que escribir una tonelada de código de bajo nivel para resolver incluso problemas primitivos; aquí es donde entra en juego la potencia y simplicidad de Spark.Apache Sparkes considerablemente más rápido que ApacheHadoopporque utiliza almacenamiento en caché en memoria yejecución optimizada para un rendimiento rápido, y es compatible con el procesamiento general por lotes, la analítica de streaming, el aprendizaje automático, los algoritmos de gráficos y lasconsultas SQL.
Para PySpark, Spark tiene dos abstracciones de datos fundamentales: el RDD y el DataFrame. Te enseñaré cómo leer tus datos y representarlos como un RDD (un conjunto de elementos del mismo tipo) o un DataFrame (una tabla de filas con columnas con nombre); esto te permite imponer una estructura a una colección distribuida de datos, permitiendo una abstracción de nivel superior. Una vez representados tus datos como un RDD o un DataFrame, puedes aplicar sobre ellos funciones de transformación (como mapeadores, filtros y reductores) para transformar tus datos en la forma deseada. Te presentaré muchas transformaciones de Spark que puedes utilizar para procesos ETL, análisis y cálculos intensivos de datos.
En la Figura 1-1 se representan algunas transformaciones RDD sencillas.
Esta figura muestra las siguientes transformaciones:
-
Primero leemos nuestros datos de entrada (representados como un archivo de texto, ejemplo.txt-aquísólo muestro las dos primeras filas/registros de datos de entrada) con una instancia de
SparkSession
, que es el punto de entrada a la programación de Spark. La instancia deSparkSession
se representa como un objetospark
. La lectura de la entrada crea un nuevo RDD comoRDD[String]
: cada registro de entrada se convierte en un elemento RDD del tipoString
(si tu ruta de entrada tieneN
registros, entonces el número de elementos RDD esN
). Esto se consigue con el siguiente código:# Create an instance of SparkSession
spark
=
SparkSession
.
builder
.
getOrCreate
()
# Create an RDD[String], which represents all input
# records; each record becomes an RDD element
records
=
spark
.
sparkContext
.
textFile
(
"sample.txt"
)
-
A continuación, convertimos todos los caracteres a minúsculas. Esto se consigue mediante la transformación
map()
, que es una transformación de 1 a 1:# Convert each element of the RDD to lowercase
# x denotes a single element of the RDD
# records: source RDD[String]
# records_lowercase: target RDD[String]
records_lowercase
=
records
.
map
(
lambda
x
:
x
.
lower
())
-
A continuación, utilizamos una transformación
flatMap()
, que es una transformación de 1 a muchos, para convertir cada elemento (que representa un único registro) en una secuencia de elementos de destino (cada uno de los cuales representa una palabra).La transformaciónflatMap()
devuelve un nuevo RDD aplicando primero una función (aquí,split(",")
) a todos los elementos del RDD de origen y aplanando después los resultados:# Split each record into a list of words
# records_lowercase: source RDD[String]
# words: target RDD[String]
words
=
records_lowercase
.
flatMap
(
lambda
x
:
x
.
split
(
","
))
-
Por último, eliminamos los elementos de palabra con una longitud inferior o igual a 2. La siguiente transformación
filter()
elimina las palabras no deseadas, manteniendo sólo las que tienen una longitud superior a 2:# Keep words with a length greater than 2
# x denotes a word
# words: source RDD[String]
# filtered: target RDD[String]
filtered
=
words
.
filter
(
lambda
x
:
len
(
x
)
>
2
)
Como puedes observar, las transformaciones de Spark son de alto nivel, potentes y sencillas. Spark es por naturaleza distribuido y paralelo: tus datos de entrada están particionados y pueden ser procesados por transformaciones (como mapeadores, filtros y reductores) en paralelo en un entorno de clúster. En pocas palabras, para resolver un problema de análisis de datos en PySpark, lees los datos y los representas como un RDD o DataFrame (dependiendo de la naturaleza del formato de los datos), luego escribes un conjunto de transformaciones para convertir tus datos en la salida deseada. Spark particiona automáticamente tus DataFrames y RDDs y distribuye las particiones a través de diferentes nodos de clúster. Las particiones son las unidades básicas del paralelismo en Spark. El paralelismo es lo que permite a los desarrolladores realizar tareas en cientos de servidores informáticos de un clúster de forma paralela e independiente. Una partición en Spark es un trozo (una división lógica) de datos almacenados en un nodo del clúster. Los DataFrames y los RDD son colecciones de particiones. Spark tiene un particionador de datos por defecto para RDDs y DataFrames, pero puedes anular esa partición con tu propiaprogramación personalizada.
A continuación, vamos a profundizar un poco más en el ecosistema y la arquitectura de Spark.
El ecosistema Spark
El ecosistema de Spark se presenta enla Figura 1-2. Tiene tres componentes principales:
- Entornos
-
Spark puede ejecutarse en cualquier lugar y se integra bien con otros entornos.
- Aplicaciones
-
Spark se integra bien con diversas plataformas y aplicaciones de big data.
- Fuentes de datos
-
Spark puede leer y escribir datos desde y hacia muchas fuentes de datos.
El amplio ecosistema de Spark hace de PySpark una gran herramienta para ETL, análisis de datos y muchas otras tareas. Con PySpark, puedes leer datos de muchas fuentes de datos diferentes (el sistema de archivos de Linux, Amazon S3, el Sistema de Archivos Distribuidos de Hadoop, tablas relacionales, MongoDB, Elasticsearch, archivos Parquet, etc.) y representarlos como una abstracción de datos Spark, como RDDs o DataFrames. Una vez que tus datos tengan esa forma, puedes utilizar una serie de transformaciones Spark sencillas y potentes para transformar los datos en la forma y el formato deseados. Por ejemplo, puedes utilizar la transformación filter()
para eliminar los registros no deseados, utilizar groupByKey()
para agrupar tus datos por la clave que desees y, por último, utilizar la transformación mapValues()
para realizar la agregación final (como hallar la media, la mediana y la desviación estándar de los números) en los datos agrupados. Todas estas transformaciones son muy posibles utilizando la sencilla pero potente API de PySpark.
Arquitectura Spark
Cuando tienes datos pequeños, es posible analizarlos con un solo ordenador en un tiempo razonable. Cuando tienes grandes volúmenes de datos, utilizar un solo ordenador para analizar y procesar esos datos (y almacenarlos) puede ser prohibitivamente lento, o incluso imposible. Por eso queremos utilizar Spark.
Spark tiene una biblioteca central y un conjunto de bibliotecas incorporadas (SQL, GraphX, Streaming, MLlib), como se muestra en la Figura 1-3. Como puedes ver, a través de su API DataSource, Spark puede interactuar con muchas fuentes de datos, como Hadoop, HBase, Amazon S3, Elasticsearch y MySQL, por mencionar algunas.
Esta figura muestra la verdadera potencia de Spark: puedes utilizar varios lenguajes diferentes para escribir tus aplicaciones Spark, y luego utilizar ricas bibliotecas para resolver diversos problemas de big data. Mientras tanto, puedes leer/escribir datos de diversas fuentes de datos.
Términos clave
Para comprender la arquitectura de Spark, necesitarás entender algunos términos clave:
SparkSession
-
La clase
SparkSession
, definida en el paquetepyspark.sql
, es el punto de entrada para programar Spark con las APIs Dataset y DataFrame. Para hacer algo útil con un clúster Spark, primero tienes que crear una instancia de esta clase, que te da acceso a una instancia deSparkContext
.Nota
PySparkdispone de una completa API (compuesta por paquetes, módulos, clases y métodos) para acceder a la API de Spark. Es importante tener en cuenta que todas las API, paquetes, módulos, clases y métodos de Spark que se tratan en este libro son específicos de PySpark. Por ejemplo, cuando me refiero a la clase
SparkContext
me estoy refiriendo a la clase Pythonpyspark.SparkContext
, definida en el paquetepyspark
, y cuando me refiero a la claseSparkSession
, me estoy refiriendo a la clase Pythonpyspark.sql.SparkSession
, definida en el módulopyspark.sql
. SparkContext
-
La clase
SparkContext
, definida en el paquetepyspark
, es el principal punto de entrada a la funcionalidad Spark. UnSparkContext
mantiene una conexión con el gestor del clúster Spark y puede utilizarse para crear RDDs y variables de difusión en el clúster. Cuando creas una instancia deSparkSession
, laSparkContext
pasa a estar disponible dentro de tu sesión como un atributo,SparkSession.sparkContext
. - Conductor
-
Todas las aplicaciones Spark (incluido el shell PySpark y los programas Python independientes) se ejecutan como conjuntos independientes de procesos. Estos procesos están coordinados por un
SparkContext
en un programa controlador. Para enviar un programa Python independiente a Spark, tienes que escribir un programa controlador con la API PySpark (o Java o Scala). Este programa se encarga del proceso de ejecutar la funciónmain()
de la aplicación y de crear elSparkContext
. También se puede utilizar para crear RDDs y DataFrames. - Trabajador
-
En un entorno de clúster Spark, hay dos tipos de nodos: uno (o dos, para alta disponibilidad) maestro y un conjunto de trabajadores. Un trabajador es cualquier nodo que puede ejecutar programas en el clúster. Si se lanza un proceso para una aplicación, esta aplicación adquiere ejecutores en los nodos worker, que se encargan de ejecutar las tareas Spark.
- Gestor de clústeres
-
El nodo "maestro" se conoce como gestor del clúster. La función principal de este nodo es gestionar el entorno del clúster y los servidores que Spark aprovechará para ejecutar las tareas. El gestor de clúster asigna recursos a cada aplicación. Spark admite cinco tipos de gestores de clúster, dependiendo de dónde se esté ejecutando:
-
Independiente (el propio entorno en clúster incorporado de Spark)
-
Mesos (un núcleo de sistemas distribuidos)
-
Nota
Aunque el uso de la terminología maestro/trabajador está pasado de moda y se está retirando en muchos contextos de software, sigue formando parte de la funcionalidad de Apache Spark, que es por lo que utilizo esta terminología en este libro.
La arquitectura Spark en pocas palabras
En la Figura 1-4 se presenta una vista de alto nivel de la arquitectura Spark. Informalmente, un clúster Spark se compone de un nodo maestro (el "gestor del clúster"), que se encarga de gestionar las aplicaciones Spark, y un conjunto de nodos "trabajadores" (ejecutores), que se encargan de ejecutar las tareas enviadas por las aplicaciones Spark (tus aplicaciones, que quieres ejecutar en el clúster Spark).
Dependiendo del entorno en el que se esté ejecutando Spark, el gestor de clúster que gestione este clúster de servidores será el gestor de clúster independiente de Spark, Kubernetes, Hadoop YARN o Mesos. Cuando el clúster Spark está en funcionamiento, puedes enviar aplicaciones Spark al gestor de clúster, que concederá recursos a tu aplicación para que puedas completar tu análisis de datos.
Tu clúster puede tener uno, decenas, cientos o incluso miles de nodos trabajadores, dependiendo de las necesidades de tu empresa y de los requisitos de tu proyecto. Puedes ejecutar Spark en un servidor independiente, como un MacBook, un Linux o un PC con Windows, peronormalmente, para entornos de producción, Spark se ejecuta en un clúster de servidores Linux. Para ejecutar un programa Spark, necesitas tener acceso a un clúster Spark y disponer de un programa controlador, que declare las transformaciones y acciones sobre RDDs de datos y envíe dichas peticiones al gestor del clúster. En este libro, todos los programas controladores estarán en PySpark.
Cuando inicias una shell PySpark (ejecutando<spark-installed-dir>/bin/pyspark
), se definen automáticamente dos variables/objetos:
spark
-
Una instancia de
SparkSession
, que es ideal para crear DataFrames sc
-
Una instancia de
SparkContext
, que es ideal para crear RDDs
Si escribes una aplicación PySpark autónoma (un controlador Python, que utiliza la API PySpark), entonces tienes que crear explícitamente tú mismo una instancia deSparkSession
. Puedes utilizar un SparkSession
para:
-
Crear marcos de datos
-
Registrar DataFrames como tablas
-
Ejecutar SQL sobre tablas y tablas caché
-
Lee/escribe texto, CSV, JSON, Parquet y otros formatos de archivo
-
Leer/escribir tablas de bases de datos relacionales
PySpark define SparkSession
como:
pyspark
.
sql
.
SparkSession
(
Python
class
,
in
pyspark
.
sql
module
)
class
pyspark
.
sql
.
SparkSession
(
sparkContext
,
jsparkSession
=
None
)
SparkSession: the entry point to programming Spark with the RDD and DataFrame API.
Para crear un SparkSession
en Python, utiliza el patrón constructor que se muestra aquí:
# import required Spark class
from
pyspark.sql
import
SparkSession
# create an instance of SparkSession as spark
spark
=
SparkSession
.
builder
\
.
master
(
"
local
"
)
\
.
appName
(
"
my-application-name
"
)
\
.
config
(
"
spark.some.config.option
"
,
"
some-value
"
)
\
.
getOrCreate
(
)
# to debug the SparkSession
(
spark
.
version
)
# create a reference to SparkContext as sc
# SparkContext is used to create new RDDs
sc
=
spark
.
sparkContext
# to debug the SparkContext
(
sc
)
Importa la clase
SparkSession
del módulopyspark.sql
.Proporciona acceso a la API del Constructor utilizada para construir instancias de
SparkSession
.Establece una opción de
config
. Las opciones establecidas mediante este método se propagan automáticamente tanto aSparkConf
como a la propia configuración deSparkSession
. Al crear un objetoSparkSession
, puedes definir cualquier número deconfig(<key>, <value>)
opciones.Obtiene un
SparkSession
existente o, si no hay ninguno, crea uno nuevo basándose en las opciones establecidas aquí.Sólo con fines de depuración.
Se puede hacer referencia a un
SparkContext
desde una instancia deSparkSession
.
PySpark define SparkContext
como:
class
pyspark
.
SparkContext
(
master
=
None
,
appName
=
None
,
...
)
SparkContext: the main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDD (the main data abstraction for Spark) and broadcast variables (such as collections and data structures) on that cluster.
SparkContext
es el principal punto de entrada a la funcionalidad Spark. Un shell (como el shell de PySpark) o un programa controlador de PySpark no pueden crear más de una instancia de . Un representa la conexión a un clúster Spark, y puede utilizarse para crear nuevos RDD y variables de difusión (estructuras de datos compartidos y colecciones -una especie de variables globales de sólo lectura) en ese clúster. SparkContext
SparkContext
La Figura 1-5 muestra cómo se puede utilizar un para crear un nuevo RDD a partir de un archivo de texto de entrada (etiquetado ) y luego transformarlo en otro RDD (etiquetado ) utilizando la transformación . Como puedes observar SparkContext
records_rdd
words_rdd
flatMap()
RDD.flatMap(f)
devuelve un nuevo RDD aplicando primero una función (f
) a todos los elementos del RDD de origen, y luego aplanando los resultados.
Para crear los objetos SparkSession
y SparkContext
, utiliza el siguiente patrón:
# create an instance of SparkSession
spark_session
=
SparkSession
.
builder
.
getOrCreate
()
# use the SparkSession to access the SparkContext
spark_context
=
spark_session
.
sparkContext
Si vas a trabajar sólo con RDDs, puedes crear una instancia deSparkContext
del siguiente modo:
from
pyspark
import
SparkContext
spark_context
=
SparkContext
(
"local"
,
"myapp"
);
Ahora que ya conoces los fundamentos de Spark, vamos a profundizar un poco más en PySpark.
El poder de PySpark
PySpark es una API de Python para Apache Spark, diseñada para apoyar la colaboración entre Spark y el lenguaje de programación Python. La mayoría de los científicos de datos ya conocen Python, y PySpark les facilita la escritura de código corto y conciso para la computación distribuida mediante Spark. En pocas palabras, es un ecosistema todo en uno que puede gestionar requisitos de datos complejos gracias a su compatibilidad con RDD, DataFrames, GraphFrames, MLlib, SQL y mucho más.
Te mostraré la increíble potencia de PySpark con un sencillo ejemplo. Supongamos que tenemos montones de registros que contienen datos sobre las visitas a URL por parte de los usuarios (recogidos por un motor de búsqueda de muchos servidores web) con el siguiente formato:
<url_address><,><frequency>
Aquí tienes algunos ejemplos de cómo son estos registros:
http://mapreduce4hackers.com,19779 http://mapreduce4hackers.com,31230 http://mapreduce4hackers.com,15708 ... https://www.illumina.com,87000 https://www.illumina.com,58086 ...
Supongamos que queremos encontrar la media, la mediana y la desviación típica de los números de visitas por clave (es decir, url_address
). Otro requisito es que queremos descartar cualquier registro con una longitud inferior a 5 (ya que puede tratarse de URL malformadas). Es fácil expresar una solución elegante para esto en PySpark, como ilustra la Figura 1-6.
En primer lugar, vamos a crear algunas funciones básicas de Python que nos ayudarán a resolver nuestro sencillo problema. La primera función, create_pair()
, acepta un único registro de la forma <url_address><,><frequency>
y devuelve un par (clave, valor) (que nos permitirá hacer unGROUP
BY
sobre el campo clave más adelante), donde la clave es un url_address
y el valor es el frequency
asociado:
# Create a pair of (url_address, frequency)
# where url_address is a key and frequency is a value
# record denotes a single element of RDD[String]
# record: <url_address><,><frequency>
def
create_pair
(
record
)
:
tokens
=
record
.
split
(
'
,
'
)
url_address
=
tokens
[
0
]
frequency
=
tokens
[
1
]
return
(
url_address
,
frequency
)
#end-def
Acepta un registro de la forma
<url_address><,><frequency>
.Tokeniza el registro de entrada, utilizando el
url_address
como clave (tokens[0]
) y elfrequency
como valor (tokens[1]
).Devuelve un par de
(url_address, frequency)
.
La siguiente función, compute_stats()
, acepta una lista de frecuencias (como números) y calcula tres valores, la media, la mediana y la desviación típica:
# Compute average, median, and standard
# deviation for a given set of numbers
import
statistics
# frequencies = [number1, number2, ...]
def
compute_stats
(
frequencies
)
:
average
=
statistics
.
mean
(
frequencies
)
median
=
statistics
.
median
(
frequencies
)
standard_deviation
=
statistics
.
stdev
(
frequencies
)
return
(
average
,
median
,
standard_deviation
)
#end-def
Este módulo proporciona funciones para calcular estadísticas matemáticas de datos numéricos.
Acepta una lista de frecuencias.
Calcula la media de las frecuencias.
Calcula la mediana de las frecuencias.
Calcula la desviación típica de las frecuencias.
Devuelve el resultado como una tripleta.
A continuación, te mostraré la asombrosa potencia de PySpark en unas pocas líneas de código, utilizando transformaciones Spark y nuestras funciones Python personalizadas:
# input_path = "s3://<bucket>/key"
input_path
=
"
/tmp/myinput.txt
"
results
=
spark
.
sparkContext
.
textFile
(
input_path
)
.
filter
(
lambda
record
:
len
(
record
)
>
5
)
.
map
(
create_pair
)
.
groupByKey
(
)
.
mapValues
(
compute_stats
)
spark
denota una instancia deSparkSession
, el punto de entrada a la programación de Spark.sparkContext
(un atributo deSparkSession
) es el principal punto de entrada a la funcionalidad Spark.Lee los datos como un conjunto distribuido de registros
String
(crea unRDD[String]
).Elimina los registros con una longitud inferior o igual a 5 (conserva los registros con una longitud superior a 5).
Crea pares
(url_address, frequency)
a partir de los registros de entrada.Agrupa los datos por claves: cada clave (un
url_address
) se asociará a una lista de frecuencias.Aplica la función
compute_stats()
a la lista de frecuencias.
El resultado será un conjunto de pares (clave, valor) de la forma
(url_address, (average, median, standard_deviation))
donde url-address
es una clave y(average, median, standard_deviation)
es un valor.
Nota
Lo más importante de Spark es que maximiza la concurrencia de funciones y operaciones mediante la partición de datos. Considera un ejemplo:
Si tus datos de entrada tienen 600.000 millones de filas y utilizas un clúster de 10 nodos, tus datos de entrada se particionarán en N
( > 1) trozos, que se procesan independientemente y en paralelo. Si N=20,000
(el número de trozos o particiones), entonces cada trozo tendrá unos 30 millones de registros/elementos (600.000.000.000 / 20.000 = 30.000.000). Si tienes un clúster grande, entonces los 20.000 trozos podrían procesarse de una sola vez. Si tienes un clúster más pequeño, puede que sólo se puedan procesar cada 100 trozos de forma independiente y en paralelo. Este proceso continuará hasta que se procesen los 20.000 trozos.
Arquitectura de PySpark
PySpark se construye sobre la API Java de Spark. Los datos se procesan en Python y se almacenan/barajan en caché en la Máquina Virtual Java, o JVM (trataré el concepto de barajar en el Capítulo 2). En la Figura 1-7 se presenta una vista de alto nivel de la arquitectura de PySpark.
Y el flujo de datos de PySpark se ilustra en la Figura 1-8.
En el programa del controlador Python (tu aplicación Spark en Python), el SparkContext
utiliza Py4Jpara lanzar una JVM, creando unJavaSparkContext
. Py4J sólo se utiliza en el controlador para la comunicación local entre los objetos SparkContext
de Python y Java; las grandes transferencias de datos se realizan a través de un mecanismo diferente. Las transformaciones RDD en Python se mapean a transformaciones sobre objetosPythonRDD
en Java. En las máquinas de trabajo remotas, los objetos PythonRDD
lanzan subprocesos Python y se comunican con ellos mediante tuberías, enviando el código del usuario y los datos que deben procesarse.
Nota
Py4J permite a los programas Python que se ejecutan en un intérprete de Python acceder dinámicamente a objetos Java en una JVM. Los métodos se invocan como si los objetos Java residieran en el intérprete de Python, y se puede acceder a las colecciones Java mediante métodos de colección estándar de Python. Py4J también permite a los programas Java devolver llamadas a objetos Python.
Abstracciones de datos Spark
Para manipular datos en el lenguaje de programación Python, utilizas enteros, cadenas, listas y diccionarios. Para manipular y analizar datos en Spark, tienes que representarlos como un conjunto de datos Spark. Spark admite tres tipos de abstracciones de conjuntos de datos:
-
RDD (conjunto de datos distribuidos resilientes):
-
API de bajo nivel
-
Denotado por
RDD[T]
(cada elemento tiene el tipoT
)
-
-
DataFrame (similar a las tablas relacionales):
-
API de alto nivel
-
Denotado por
Table(column_name_1, column_name_2, ...)
-
-
Conjunto de datos (similar a las tablas relacionales):
-
API de alto nivel (no disponible en PySpark)
-
La abstracción de datos Dataset se utiliza en lenguajes fuertemente tipados como Java y no está soportada en PySpark. Los RDDs y los DataFrames se tratarán en detalle en los siguientes capítulos, pero aquí haré una breve introducción.
Ejemplos de RDD
Esencialmente, un RDD representa tus datos como una colección de elementos. Es un conjunto inmutable de elementos distribuidos de tipo T
, denotado como RDD[T]
.
La Tabla 1-1 muestra ejemplos de tres tipos sencillos de RDD:
RDD[Integer]
-
Cada elemento es un
Integer
. RDD[String]
-
Cada elemento es un
String
. RDD[(String, Integer)]
-
Cada elemento es un par de
(String, Integer)
.
RDD[Entero] | RDD[Cadena] | RDD[(Cadena, Entero)] |
---|---|---|
|
|
|
|
|
|
|
|
|
... |
... |
... |
La Tabla 1-2 es un ejemplo de RDD complejo. Cada elemento es un par (clave, valor), donde la clave es un String
y el valor es una tripleta de (Integer, Integer, Double)
.
RDD[(String, (Integer, Integer, Double))] |
---|
|
|
|
|
... |
Operaciones Spark RDD
Los RDD de Spark son de sólo lectura, inmutables y distribuidos. Una vez creados, no pueden alterarse: no puedes añadir registros, borrar registros ni actualizar registros en un RDD. Sin embargo, se pueden transformar. Los RDD admiten dos tipos de operaciones: las transformaciones, que transforman los RDD fuente en uno o varios RDD nuevos, y las acciones, que transforman los RDD fuente en un objeto que no es un RDD, como un diccionario o una matriz. La relación entre RDDs, transformaciones y acciones se ilustra en la Figura 1-9.
Entraremos en mucho más detalle sobre las transformaciones de Spark en los capítulos siguientes, con ejemplos prácticos que te ayudarán a comprenderlas, pero aquí te proporcionaré una breve introducción.
Transformaciones
Una transformación en Spark es una función que toma un RDD existente (el RDD de origen), le aplica una transformación y crea un nuevo RDD (el RDD de destino). Algunos ejemplos son: map()
,flatMap()
, groupByKey()
, reduceByKey()
, yfilter()
.
Informalmente, podemos expresar una transformación como:
transformation: source_RDD[V] --> target_RDD[T]
Los RDD no se evalúan hasta que se realiza una acción sobre ellos: esto significa que las transformaciones se evalúan perezosamente. Si un RDD falla durante una transformación, el linaje de datos de las transformaciones reconstruye el RDD.
La mayoría de las transformaciones Spark crean un único RDD, pero también es posible que creen varios RDD de destino. Los RDD de destino pueden ser más pequeños, más grandes o del mismo tamaño que el RDD de origen.
El siguiente ejemplo presenta una secuencia de transformaciones:
tuples
=
[(
'A'
,
7
),
(
'A'
,
8
),
(
'A'
,
-
4
),
(
'B'
,
3
),
(
'B'
,
9
),
(
'B'
,
-
1
),
(
'C'
,
1
),
(
'C'
,
5
)]
rdd
=
spark
.
sparkContext
.
parallelize
(
tuples
)
# drop negative values
positives
=
rdd
.
filter
(
lambda
x
:
x
[
1
]
>
0
)
positives
.
collect
()
[(
'A'
,
7
),
(
'A'
,
8
),
(
'B'
,
3
),
(
'B'
,
9
),
(
'C'
,
1
),
(
'C'
,
5
)]
# find sum and average per key using groupByKey()
sum_and_avg
=
positives
.
groupByKey
()
.
mapValues
(
lambda
v
:
(
sum
(
v
),
float
(
sum
(
v
))
/
len
(
v
)))
# find sum and average per key using reduceByKey()
# 1. create (sum, count) per key
sum_count
=
positives
.
mapValues
(
lambda
v
:
(
v
,
1
))
# 2. aggregate (sum, count) per key
sum_count_agg
=
sum_count
.
reduceByKey
(
lambda
x
,
y
:
(
x
[
0
]
+
y
[
0
],
x
[
1
]
+
y
[
1
]))
# 3. finalize sum and average per key
sum_and_avg
=
sum_count_agg
.
mapValues
(
lambda
v
:
(
v
[
0
],
float
(
v
[
0
])
/
v
[
1
]))
Consejo
La transformación groupByKey()
agrupa los valores de cada clave del RDD en una única secuencia, similar a una sentencia SQL GROUP
BY
. Esta transformación puede provocar errores de falta de memoria (OOM), ya que los datos se envían a través de la red de servidores Spark y se recopilan en los reductores/trabajadores cuando el número de valores por clave es de miles o millones.
Con la transformación reduceByKey()
, sin embargo, los datos se combinan en cada partición, por lo que sólo hay una salida para cada clave en cada partición que enviar a través de la red de servidores Spark. Esto la hace más escalable que groupByKey()
. reduceByKey()
combina los valores de cada clave utilizando una función de reducción asociativa y conmutativa. Combina todos los valores (por clave) en otro valor con exactamente el mismo tipo de datos (esto es una limitación, que puede superarse utilizando la transformacióncombineByKey()
). En general, reduceByKey()
es más escalable que groupByKey()
. Hablaremos más de estos temas en el Capítulo 4.
Acciones
Las acciones Spark son operaciones o funciones RDD que producen valores no RDD. Informalmente, podemos expresar una acción como:
action: RDD => non-RDD value
Las acciones pueden desencadenar la evaluación de RDDs (que, como recordarás, se evalúan perezosamente). Sin embargo, la salida de una acción es un valor tangible: un archivo guardado, un valor como un entero, un recuento de elementos, una lista de valores, un diccionario, etc.
Los siguientes son ejemplos de acciones:
reduce()
-
Aplica una función para obtener un único valor, como sumar valores para un determinado
RDD[Integer]
collect()
-
Convierte un
RDD[T]
en una lista de tipoT
count()
-
Encuentra el número de elementos de un RDD dado
saveAsTextFile()
-
Guarda los elementos del RDD en un disco
saveAsMap()
-
Guarda los elementos de
RDD[(K, V)]
en un disco como archivodict[K, V]
Ejemplos de DataFrame
Similar a un RDD, un DataFrame en Spark es una colección distribuida inmutable de datos. Pero a diferencia de un RDD, los datos se organizan en columnas con nombre, como una tabla en una base de datos relacional. Esto se hace para facilitar el procesamiento de grandes conjuntos de datos. Los DataFrames permiten a los programadores imponer una estructura a una colección distribuida de datos, permitiendo una abstracción de mayor nivel. También hacen que el procesamiento de archivos CSV y JSON sea mucho más fácil que con los RDD.
El siguiente ejemplo de DataFrame tiene tres columnas:
DataFrame[name, age, salary] name: String, age: Integer, salary: Integer +-----+----+---------+ | name| age| salary| +-----+----+---------+ | bob| 33| 45000| | jeff| 44| 78000| | mary| 40| 67000| | ...| ...| ...| +-----+----+---------+
Un DataFrame puede crearse a partir de muchas fuentes diferentes, como tablas Hive, Archivos de Datos Estructurados (SDF), bases de datos externas o RDD existentes. La API de DataFrames se diseñó para aplicaciones modernas de big data y ciencia de datos, inspirándose en DataFrames en R y pandas en Python. Como veremos en capítulos posteriores, podemos ejecutar consultas SQL contra DataFrames.
Spark SQL viene con un amplio conjunto de potentes operaciones DataFrame que incluye:
-
Funciones de agregación (mín, máx, suma, media, etc.)
-
Funciones de recogida
-
Funciones matemáticas
-
Funciones de clasificación
-
Funciones de cadena
-
Funciones definidas por el usuario (UDF)
Por ejemplo, puedes leer fácilmente un archivo CSV y crear un DataFrame a partir de él:
# define input path
virus_input_path
=
"s3://mybucket/projects/cases/case.csv"
# read CSV file and create a DataFrame
cases_dataframe
=
spark
.
read
.
load
(
virus_input_path
,
format
=
"csv"
,
sep
=
","
,
inferSchema
=
"true"
,
header
=
"true"
)
# show the first 3 rows of created DataFrame
cases_dataframe
.
show
(
3
)
+-------+-------+-----------+--------------+---------+
|
case_id
|
country
|
city
|
infection_case
|
confirmed
|
+-------+-------+-----------+--------------+---------+
|
C0001
|
USA
|
New
York
|
contact
|
175
|
+-------+-------+-----------+--------------+---------+
|
C0008
|
USA
|
New
Jersey
|
unknown
|
25
|
+-------+-------+-----------+--------------+---------+
|
C0009
|
USA
|
Cupertino
|
contact
|
100
|
+-------+-------+-----------+--------------+---------+
Para ordenar los resultados por número de casos en orden descendente, podemos utilizar la función sort()
:
# We can do this using the F.desc function:
from
pyspark.sql
import
functions
as
F
cases_dataframe
.
sort
(
F
.
desc
(
"confirmed"
))
.
show
()
+-------+-------+-----------+--------------+---------+
|
case_id
|
country
|
city
|
infection_case
|
confirmed
|
+-------+-------+-----------+--------------+---------+
|
C0001
|
USA
|
New
York
|
contact
|
175
|
+-------+-------+-----------+--------------+---------+
|
C0009
|
USA
|
Cupertino
|
contact
|
100
|
+-------+-------+-----------+--------------+---------+
|
C0008
|
USA
|
New
Jersey
|
unknown
|
25
|
+-------+-------+-----------+--------------+---------+
También podemos filtrar filas fácilmente:
cases_dataframe
.
filter
((
cases_dataframe
.
confirmed
>
100
)
&
(
cases_dataframe
.
country
==
'USA'
))
.
show
()
+-------+-------+-----------+--------------+---------+
|
case_id
|
country
|
city
|
infection_case
|
confirmed
|
+-------+-------+-----------+--------------+---------+
|
C0001
|
USA
|
New
York
|
contact
|
175
|
+-------+-------+-----------+--------------+---------+
...
Para que te hagas una mejor idea de la potencia de los DataFrames de Spark, veamos un ejemplo. Crearemos un DataFrame y hallaremos la media y la suma de horas trabajadas por los empleados por departamento:
# Import required libraries
from
pyspark.sql
import
SparkSession
from
pyspark.sql.functions
import
avg
,
sum
# Create a DataFrame using SparkSession
spark
=
SparkSession
.
builder
.
appName
(
"demo"
)
.
getOrCreate
()
dept_emps
=
[(
"Sales"
,
"Barb"
,
40
),
(
"Sales"
,
"Dan"
,
20
),
(
"IT"
,
"Alex"
,
22
),
(
"IT"
,
"Jane"
,
24
),
(
"HR"
,
"Alex"
,
20
),
(
"HR"
,
"Mary"
,
30
)]
df
=
spark
.
createDataFrame
(
dept_emps
,
[
"dept"
,
"name"
,
"hours"
])
# Group the same depts together, aggregate their hours, and compute an average
averages
=
df
.
groupBy
(
"dept"
)
.
agg
(
avg
(
"hours"
)
.
alias
(
'average'
),
sum
(
"hours"
)
.
alias
(
'total'
))
# Show the results of the final execution
averages
.
show
()
+-----+--------+------+
|
dept
|
average
|
total
|
+-----+--------+------+
|
Sales
|
30.0
|
60.0
|
|
IT
|
23.0
|
46.0
|
|
HR
|
25.0
|
50.0
|
+-----+--------+------+
Como puedes ver, los DataFrames de Spark son lo suficientemente potentes como para manipular miles de millones de filas con funciones sencillas pero potentes.
Utilizar la Shell de PySpark
Hay dos formas principales de utilizar PySpark:
-
Utiliza el intérprete de comandos PySpark (para pruebas y programación interactiva).
-
Utiliza PySpark en una aplicación autónoma. En este caso, escribes un programa controlador Python (digamos,mi_programa_pyspark.py) utilizando la API PySpark y luego lo ejecutas con el comando
spark-submit
:export SUBMIT=$SPARK_HOME/bin/spark-submit $SUBMIT [options] my_pyspark_program.py <parameters>
donde
<parameters>
es una lista de parámetros consumidos por tu programa PySpark(mi_programa_pyspark.py).
Nota
Para más detalles sobre el uso del comando spark-submit
, consulta"Envío de solicitudes" en la documentación de Spark.
En esta sección nos centraremos en el shell interactivo de Spark para usuarios de Python, una potente herramienta que puedes utilizar para analizar datos de forma interactiva y ver los resultados inmediatamente (Spark también proporciona un shell Scala). La shell PySpark puede funcionar tanto en instalaciones de una sola máquina como en instalaciones en clúster de Spark. Utiliza el siguiente comando para iniciar el intérprete de comandos, donde SPARK_HOME
indica el directorio de instalación de Spark en tu sistema:
export SPARK_HOME=<spark-installation-directory> $SPARK_HOME/bin/pyspark
Por ejemplo:
export SPARK_HOME="/home/spark" $SPARK_HOME/bin/pyspark Python 3.7.2 Welcome to Spark version 3.1.2 Using Python version 3.7.2 SparkSession available as spark. SparkContext available as sc >>>
Cuando inicias el intérprete de comandos, PySpark muestra alguna información útil, como detalles sobre las versiones de Python y Spark que está utilizando (ten en cuenta que aquí se ha acortado la salida). El símbolo >>>
se utiliza como indicador del intérprete de comandos PySpark. Este indicador indica que ya puedes escribir comandos Python o PySpark y ver los resultados.
Para que te sientas cómodo con el intérprete de comandos PySpark, las siguientes secciones te guiarán a través de algunos ejemplos de uso básico.
Iniciar la Shell de PySpark
Para entrar en un intérprete de comandos PySpark, ejecutamos pyspark
del siguiente modo:
$SPARK_HOME/bin/pyspark
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.1.2 /_/ SparkSession available as 'spark'. SparkContext available as 'sc'.
>>
>
sc
.
version
'
3.1.2
'
>>
>
spark
.
version
'
3.1.2
'
Ejecutando
pyspark
se creará un nuevo intérprete de comandos. La salida aquí se ha acortado.Comprueba que
SparkContext
se crea comosc
.Comprueba que
SparkSession
se crea comospark
.
Una vez que entras en el shell de PySpark, se crea una instancia de SparkSession
como variable spark
y una instancia de SparkContext
como variable sc
. Como aprendiste anteriormente en este capítulo, el SparkSession
es el punto de entrada a la programación de Spark con las APIs Dataset y DataFrame; un SparkSession
puede utilizarse para crear DataFrames, registrar DataFrames como tablas, ejecutar SQL sobre tablas, almacenar tablas en caché y leer archivos CSV, JSON y Parquet.
Si quieres utilizar PySpark en una aplicación autónoma, entonces tienes que crear explícitamente un SparkSession
utilizando el patrón constructor que se muestra en "La arquitectura Spark en pocas palabras". Un SparkContext
es el principal punto de entrada a la funcionalidad Spark; puede utilizarse para crear RDDs a partir de archivos de texto y colecciones Python. Lo veremos a continuación.
Crear un RDD a partir de una colección
Spark nos permite crear nuevos RDD a partir de archivos y colecciones (estructuras de datos como matrices y listas). Aquí, utilizamos SparkContext.parallelize()
para crear un nuevo RDD a partir de una colección (representada como data
):
>>
>
data
=
[
(
"
fox
"
,
6
)
,
(
"
dog
"
,
5
)
,
(
"
fox
"
,
3
)
,
(
"
dog
"
,
8
)
,
(
"
cat
"
,
1
)
,
(
"
cat
"
,
2
)
,
(
"
cat
"
,
3
)
,
(
"
cat
"
,
4
)
]
>>
>
# use SparkContext (sc) as given by the PySpark shell
>>
>
# create an RDD as rdd
>>
>
rdd
=
sc
.
parallelize
(
data
)
>>
>
rdd
.
collect
(
)
[
(
'
fox
'
,
6
)
,
(
'
dog
'
,
5
)
,
(
'
fox
'
,
3
)
,
(
'
dog
'
,
8
)
,
(
'
cat
'
,
1
)
,
(
'
cat
'
,
2
)
,
(
'
cat
'
,
3
)
,
(
'
cat
'
,
4
)
]
>>
>
rdd
.
count
(
)
8
Agregar y fusionar valores de claves
La transformación reduceByKey()
se utiliza para fusionar y agregar valores. En este ejemplo, x
y y
se refieren a los valores de la misma clave:
>>
>
sum_per_key
=
rdd
.
reduceByKey
(
lambda
x
,
y
:
x
+
y
)
>>
>
sum_per_key
.
collect
(
)
[
(
'
fox
'
,
9
)
,
(
'
dog
'
,
13
)
,
(
'
cat
'
,
10
)
]
El RDD fuente de esta transformación debe estar formado por pares (clave, valor). reduceByKey()
fusiona los valores de cada clave utilizando una función de reducción asociativa y conmutativa. También realizará la fusión localmente en cada mapeador antes de enviar los resultados a un reductor, de forma similar a un "combinador" en MapReduce. La salidase particionará con particiones numPartitions
, o el nivel de paralelismo por defecto si no se especifica numPartitions
. El particionador por defecto es HashPartitioner
.
Si T
es el tipo del valor de los pares (clave, valor), entonces reduceByKey()
's func()
puede definirse como:
# source_rdd : RDD[(K, T)]
# target_rdd : RDD[(K, T)]
target_rdd
=
source_rdd
.
reduceByKey
(
lambda
x
,
y
:
func
(
x
,
y
)
)
# OR you may write it by passing the function name
# target_rdd = source_rdd.reduceByKey(func)
# where
# func(T, T) -> T
# Then you may define `func()` in Python as:
# x: type of T
# y: type of T
def
func
(
x
,
y
)
:
result
=
<
aggregation
of
x
and
y
:
return
a
result
of
type
T
>
return
result
#end-def
Esto significa que:
-
Hay dos argumentos de entrada (del mismo tipo,
T
) para el reductorfunc()
. -
El tipo de retorno de
func()
debe ser el mismo que el tipo de entradaT
(esta limitación puede evitarse si utilizas la transformacióncombineByKey()
). -
El reductor
func()
tiene que ser asociativo. Informalmente, una operación binariaf()
sobre un conjuntoT
se denomina asociativa si cumple la ley asociativa, que establece que el orden en que se agrupan los números no cambia el resultado de la operación. -
El reductor
func()
tiene que ser conmutativo: informalmente, una funciónf()
para la quef(x, y) = f(y, x)
para todos los valores dex
yy
. Es decir, un cambio en el orden de los números no debe afectar al resultado de la operación.Ley conmutativa
f(x, y) = f(y, x)
La ley conmutativa también es válida para la suma y la multiplicación, pero no para la resta o la división. Por ejemplo:
5 + 3 = 3 + 5 pero 5 - 3 ≠ 3 - 5
Por tanto, no puedes utilizar operaciones de resta o división en una transformación reduceByKey()
.
Agrupar claves similares
Podemos utilizar la transformación groupByKey()
para agrupar los valores de cada clave del RDD en una única secuencia:
>>
>
grouped
=
rdd
.
groupByKey
(
)
>>
>
grouped
.
collect
(
)
[
(
'
fox
'
,
<
ResultIterable
object
at
0x10f45c790
>
)
,
(
'
dog
'
,
<
ResultIterable
object
at
0x10f45c810
>
)
,
(
'
cat
'
,
<
ResultIterable
object
at
0x10f45cd90
>
)
]
>>
>
>>
>
# list(v) converts v as a ResultIterable into a list
>>
>
grouped
.
map
(
lambda
(
k
,
v
)
:
(
k
,
list
(
v
)
)
)
.
collect
(
)
[
(
'
fox
'
,
[
6
,
3
]
)
,
(
'
dog
'
,
[
5
,
8
]
)
,
(
'
cat
'
,
[
1
,
2
,
3
,
4
]
)
]
Agrupa elementos de la misma clave en una secuencia de elementos.
Ver el resultado.
El nombre completo de
ResultIterable
espyspark.resultiterable.ResultIterable
.Primero aplica
map()
y luegocollect()
, que devuelve una lista que contiene todos los elementos del RDD resultante. La funciónlist()
convierteResultIterable
en una lista de objetos.
El RDD fuente de esta transformación debe estar compuesto por pares (clave, valor). groupByKey()
agrupa los valores de cada clave del RDD en una única secuencia, y divide en hash el RDD resultante con las particiones de numPartitions
, o con el nivel de paralelismo por defecto si no se especifica numPartitions
. Ten en cuenta que si estás agrupando (utilizando la transformacióngroupByKey()
) para realizar una agregación, como una suma o una media, sobre cada clave, utilizar reduceByKey()
o aggregateByKey()
proporcionará un rendimiento mucho mejor.
Agregar valores para claves similares
Para agregar y sumar los valores de cada clave, podemos utilizar la transformación mapValues()
y la función sum()
:
>>
>
aggregated
=
grouped
.
mapValues
(
lambda
values
:
sum
(
values
)
)
>>
>
aggregated
.
collect
(
)
[
(
'
fox
'
,
9
)
,
(
'
dog
'
,
13
)
,
(
'
cat
'
,
10
)
]
values
es una secuencia de valores por clave. Pasamos cada valor del par RDD (clave, valor) a través de una función de mapeo (añadiendo todos losvalues
consum(values)
) sin cambiar las claves.Para la depuración, devolvemos una lista que contiene todos los elementos de este RDD.
Tenemos varias opciones para agregar y sumar valores: reduceByKey()
y groupByKey()
, por mencionar algunas. En general, la transformaciónreduceByKey()
es más eficaz que la transformación groupByKey()
. En el capítulo 4 encontrarás más detalles al respecto.
Como verás en los siguientes capítulos, Spark dispone de muchas otras potentes transformaciones que pueden convertir un RDD en un nuevo RDD. Como ya se ha mencionado, los RDD son de sólo lectura, inmutables y distribuidos. Las transformaciones de RDD devuelven un puntero a un nuevo RDD y te permiten crear dependencias entre RDDs. Cada RDD de la cadena de dependencias (o cadena de dependencias) tiene una función para calcular sus datos y un puntero (dependencia) a su RDD padre.
Herramientas de análisis de datos para PySpark
- Jupyter
-
Jupyter es una gran herramienta para probar y crear prototipos de programas. PySpark también puede utilizarse desde los cuadernos Jupyter; es muy práctico para el análisis exploratorio de datos.
- Apache Zeppelin
-
Zeppelin es un bloc de notas basado en web que permite realizar análisis de datos interactivos y documentos colaborativos con SQL, Python, Scala, etc.
Ejemplo de ETL con DataFrames
En el análisis y la informática de datos, ETL es el procedimiento general de copiar datos de una o varias fuentes en un sistema de destino que representa los datos de forma diferente a la fuente o fuentes o en un contexto diferente al de la fuente o fuentes. Aquí mostraré cómo Spark hace posible y fácil la ETL.
Para este ejemplo ETL, utilizaré los datos del censo de 2010 en formato JSON(censo_2010.json):
$ wc -l census_2010.json 101 census_2010.json $ head -5 census_2010.json {"females": 1994141, "males": 2085528, "age": 0, "year": 2010} {"females": 1997991, "males": 2087350, "age": 1, "year": 2010} {"females": 2000746, "males": 2088549, "age": 2, "year": 2010} {"females": 2002756, "males": 2089465, "age": 3, "year": 2010} {"females": 2004366, "males": 2090436, "age": 4, "year": 2010}
Nota
Estos datos se extrajeron de los datos de la Oficina del Censo de EE.UU., que en el momento de escribir este libro sólo proporciona las opciones binarias de hombre y mujer. Nos esforzamos por ser lo más inclusivos posible, y esperamos que en el futuro conjuntos de datos nacionales como éstos proporcionen opciones más inclusivas.
Definamos nuestro proceso ETL:
- Extracción
-
Primero, creamos un DataFrame a partir de un documento JSON dado.
- Transformación
-
A continuación, filtramos los datos y nos quedamos con los registros de mayores (
age > 54
). A continuación, añadimos una nueva columna,total
, que es el total de hombres y mujeres. - Cargando
-
Por último, escribimos el DataFrame revisado en una base de datos MySQL y verificamos el proceso de carga.
Profundicemos un poco más en este proceso.
Extracción
Para realizar una extracción adecuada, primero tenemos que crear una instancia de la clase SparkSession
:
from
pyspark.sql
import
SparkSession
spark
=
SparkSession
.
builder
\.
master
(
"local"
)
\.
appName
(
"ETL"
)
\.
getOrCreate
()
A continuación, leemos el JSON y creamos un DataFrame:
>>>
input_path
=
"census_2010.json"
>>>
census_df
=
spark
.
read
.
json
(
input_path
)
>>>
census_df
.
count
()
101
>>>
census_df
.
show
(
200
)
+---+-------+-------+----+
|
age
|
females
|
males
|
year
|
+---+-------+-------+----+
|
0
|
1994141
|
2085528
|
2010
|
|
1
|
1997991
|
2087350
|
2010
|
|
2
|
2000746
|
2088549
|
2010
|
...
|
54
|
2221350
|
2121536
|
2010
|
|
55
|
2167706
|
2059204
|
2010
|
|
56
|
2106460
|
1989505
|
2010
|
...
|
98
|
35778
|
8321
|
2010
|
|
99
|
25673
|
4612
|
2010
|
+---+-------+-------+----+
only
showing
top
100
rows
Transformación
La transformación puede implicar muchos procesos cuya finalidad es limpiar, formatear o realizar cálculos sobre los datos para adaptarlos a tus necesidades. Por ejemplo, puedes eliminar datos que falten o estén duplicados, unir columnas para crear otras nuevas o filtrar determinadas filas o columnas. Una vez que hemos creado el Marco de Datos mediante el proceso de extracción, podemos realizar muchas transformaciones útiles, como seleccionar sólo los mayores:
>>>
seniors
=
census_df
[
census_df
[
'age'
]
>
54
]
>>>
seniors
.
count
()
46
>>>
seniors
.
show
(
200
)
+---+-------+-------+----+
|
age
|
females
|
males
|
year
|
+---+-------+-------+----+
|
55
|
2167706
|
2059204
|
2010
|
|
56
|
2106460
|
1989505
|
2010
|
|
57
|
2048896
|
1924113
|
2010
|
...
|
98
|
35778
|
8321
|
2010
|
|
99
|
25673
|
4612
|
2010
|
|
100
|
51007
|
9506
|
2010
|
+---+-------+-------+----+
A continuación, creamos una nueva columna agregada llamada total
, que suma los números de hombres y mujeres:
>>>
from
pyspark.sql.functions
import
lit
>>>
seniors_final
=
seniors
.
withColumn
(
'total'
,
lit
(
seniors
.
males
+
seniors
.
females
))
>>>
seniors_final
.
show
(
200
)
+---+-------+-------+----+-------+
|
age
|
females
|
males
|
year
|
total
|
+---+-------+-------+----+-------+
|
55
|
2167706
|
2059204
|
2010
|
4226910
|
|
56
|
2106460
|
1989505
|
2010
|
4095965
|
|
57
|
2048896
|
1924113
|
2010
|
3973009
|
...
|
98
|
35778
|
8321
|
2010
|
44099
|
|
99
|
25673
|
4612
|
2010
|
30285
|
|
100
|
51007
|
9506
|
2010
|
60513
|
+---+-------+-------+----+-------+
Cargando
El proceso de carga implica guardar o escribir el resultado final del paso de transformación. Aquí escribiremos el DataFrame seniors_final
en una tabla MySQL:
seniors_final
\.
write
\.
format
(
"jdbc"
)
\.
option
(
"driver"
,
"com.mysql.jdbc.Driver"
)
\.
mode
(
"overwrite"
)
\.
option
(
"url"
,
"jdbc:mysql://localhost/testdb"
)
\.
option
(
"dbtable"
,
"seniors"
)
\.
option
(
"user"
,
"root"
)
\.
option
(
"password"
,
"root_password"
)
\.
save
()
El último paso de la carga es verificar el proceso de carga:
$
mysql
-uroot
-p
Enter
password:
<
password>
Your
MySQL
connection
id
is
9
Server
version:
5
.7.30
MySQL
Community
Server
(
GPL
)
mysql>
use
testdb
;
Database
changed
mysql>
select
*
from
seniors
;
+------+---------+---------+------+---------+
|
age
|
females
|
males
|
year
|
total
|
+------+---------+---------+------+---------+
|
55
|
2167706
|
2059204
|
2010
|
4226910
|
|
56
|
2106460
|
1989505
|
2010
|
4095965
|
|
57
|
2048896
|
1924113
|
2010
|
3973009
|
...
|
98
|
35778
|
8321
|
2010
|
44099
|
|
99
|
25673
|
4612
|
2010
|
30285
|
|
100
|
51007
|
9506
|
2010
|
60513
|
+------+---------+---------+------+---------+
46
rows
in
set
(
0
.00
sec
)
Resumen
Recapitulemos algunos puntos clave del capítulo:
-
Spark es un motor de análisis unificado rápido y potente (hasta cien veces más rápido que el tradicional Hadoop MapReduce) gracias a su funcionamiento en memoria, y ofrece abstracciones de datos robustas, distribuidas y tolerantes a fallos (denominadas RDD y DataFrames). Spark se integra con el mundo del aprendizaje automático y el análisis de grafos a través de los paquetes MLlib (biblioteca de aprendizaje automático) y GraphX (biblioteca de grafos).
-
Puedes utilizar las transformaciones y acciones de Spark en cuatro lenguajes de programación: Java, Scala, R y Python. PySpark (la API de Python para Spark) puede utilizarse para resolver problemas de big data, transformando eficazmente tus datos en el resultado y formato deseados.
-
Los big data pueden representarse utilizando las abstracciones de datos de Spark (RDDs, DataFrames y Datasets, todos ellos conjuntos de datos distribuidos).
-
Puedes ejecutar PySpark desde el shell de PySpark (utilizando el comando
pyspark
desde una línea de comandos) para programar Spark de forma interactiva. Utilizando el shell PySpark, puedes crear y manipular RDDs y DataFrames. -
Puedes enviar una aplicación PySpark autónoma a un clúster Spark utilizando el comando
spark-submit
; las aplicaciones autónomas que utilizan PySpark se implementan en entornos de producción. -
Spark ofrece muchas transformaciones y acciones para resolver problemas de big data, y su rendimiento difiere (por ejemplo,
reduceByKey()
frente agroupByKey()
ycombineByKey()
frente agroupByKey()
).
El siguiente capítulo se sumerge en algunas transformaciones importantes de Spark.
Get Algoritmos de datos con Spark now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.