Capítulo 1. Introducción a Spark y PySpark

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

Spark es un potente motor de análisis para el procesamiento de datos a gran escala que tiene como objetivo la velocidad, la facilidad de uso y la extensibilidad para aplicaciones de big data. Es una tecnología probada y ampliamente adoptada, utilizada por muchas empresas que manejan big data a diario. Aunque el lenguaje "nativo" de Spark es Scala (la mayor parte de Spark se desarrolla en Scala), también proporciona API de alto nivel en Java, Python y R.

En este libro utilizaremos Python a través de PySpark, una API que expone el modelo de programación Spark a Python. Dado que Python es el lenguaje de programación más accesible y que la API de Spark es potente y expresiva, la sencillez de PySpark hace que sea la mejor opción para nosotros. PySpark es una interfaz para Spark en el lenguaje de programación Python que proporciona las dos características importantes siguientes:

  • Nos permite escribir aplicaciones Spark utilizando las API de Python.

  • Proporciona el shell PySpark para analizar datos de forma interactiva en unentorno distribuido.

El propósito de este capítulo es presentar PySpark como el principal componente del ecosistema Spark y mostrarte que puede utilizarse eficazmente para tareas de big data como operaciones ETL, indexación de miles de millones de documentos, ingesta de millones de genomas, aprendizaje automático, análisis de datos de grafos, análisis de datos de ADN y mucho más. Empezaré revisando las arquitecturas de Spark y PySpark, y daré ejemplos para mostrar la potencia expresiva de PySpark. Presentaré una visión general de las principales funciones (transformaciones y acciones) y conceptos de Spark para que estés capacitado para empezar a utilizar Spark y PySpark de inmediato. Las principales abstracciones de datos de Spark son los conjuntos de datos distribuidos resilientes (RDD), los DataFrames y los Datasets. Como verás, puedes representar tus datos (almacenados como archivos de Hadoop, objetos de Amazon S3, archivos de Linux, estructuras de datos de colecciones, tablas de bases de datos relacionales, etc.) en cualquier combinación de RDDs yDataFrames.

Una vez que tus datos están representados como una abstracción de datos Spark, puedes aplicar transformaciones sobre ellos y crear nuevas abstracciones de datos hasta que los datos tengan la forma final que buscas. Las transformaciones de Spark (como map() y reduceByKey()) pueden utilizarse para convertir tus datos de una forma a otra hasta que obtengas el resultado deseado. En breve explicaré estas abstracciones de datos, pero antes, profundicemos un poco más en por qué Spark es la mejor opción para el análisis de datos.

Por qué Spark para el análisis de datos

Spark es un potente motor de análisis que puede utilizarse para el procesamiento de datos a gran escala. Las razones más importantes para utilizar Spark son:

  • Spark es sencillo, potente y rápido.

  • Spark es gratuito y de código abierto.

  • Spark funciona en todas partes (Hadoop, Mesos, Kubernetes, independiente o en la nube).

  • Spark puede leer/escribir datos desde/a cualquier fuente de datos (Amazon S3, Hadoop HDFS, bases de datos relacionales, etc.).

  • Spark puede integrarse con casi cualquier aplicación de datos.

  • Spark puede leer/escribir datos en formatos basados en filas (como Avro) y en columnas (como Parquet y ORC).

  • Spark dispone de un rico pero sencillo conjunto de API para todo tipo de procesos ETL.

En los últimos cinco años, Spark ha progresado de tal manera que creo que puede utilizarse para resolver cualquier problema de big data. Esto se ve respaldado por el hecho de que todas las empresas de big data, como Facebook, Illumina, IBM y Google, utilizan Spark a diario en sistemas de producción.

Spark es una de las mejores opciones para el procesamiento de datos a gran escala y para resolver problemas deMapReducey más allá, ya que libera el poder de los datos al manejar big data con potentes API y velocidad. Utilizar MapReduce/Hadoop para resolver problemas de big data es complejo, y tienes que escribir una tonelada de código de bajo nivel para resolver incluso problemas primitivos; aquí es donde entra en juego la potencia y simplicidad de Spark.Apache Sparkes considerablemente más rápido que ApacheHadoopporque utiliza almacenamiento en caché en memoria yejecución optimizada para un rendimiento rápido, y es compatible con el procesamiento general por lotes, la analítica de streaming, el aprendizaje automático, los algoritmos de gráficos y lasconsultas SQL.

Para PySpark, Spark tiene dos abstracciones de datos fundamentales: el RDD y el DataFrame. Te enseñaré cómo leer tus datos y representarlos como un RDD (un conjunto de elementos del mismo tipo) o un DataFrame (una tabla de filas con columnas con nombre); esto te permite imponer una estructura a una colección distribuida de datos, permitiendo una abstracción de nivel superior. Una vez representados tus datos como un RDD o un DataFrame, puedes aplicar sobre ellos funciones de transformación (como mapeadores, filtros y reductores) para transformar tus datos en la forma deseada. Te presentaré muchas transformaciones de Spark que puedes utilizar para procesos ETL, análisis y cálculos intensivos de datos.

En la Figura 1-1 se representan algunas transformaciones RDD sencillas.

daws 0101
Figura 1-1. Transformaciones RDD sencillas

Esta figura muestra las siguientes transformaciones:

  1. Primero leemos nuestros datos de entrada (representados como un archivo de texto, ejemplo.txt-aquísólo muestro las dos primeras filas/registros de datos de entrada) con una instancia de SparkSession, que es el punto de entrada a la programación de Spark. La instancia de SparkSession se representa como un objeto spark. La lectura de la entrada crea un nuevo RDD como RDD[String]: cada registro de entrada se convierte en un elemento RDD del tipo String (si tu ruta de entrada tiene N registros, entonces el número de elementos RDD es N). Esto se consigue con el siguiente código:

    # Create an instance of SparkSession
    spark = SparkSession.builder.getOrCreate()
    # Create an RDD[String], which represents all input
    # records; each record becomes an RDD element
    records = spark.sparkContext.textFile("sample.txt")
  2. A continuación, convertimos todos los caracteres a minúsculas. Esto se consigue mediante la transformación map(), que es una transformación de 1 a 1:

    # Convert each element of the RDD to lowercase
    # x denotes a single element of the RDD
    # records: source RDD[String]
    # records_lowercase: target RDD[String]
    records_lowercase = records.map(lambda x: x.lower())
  3. A continuación, utilizamos una transformación flatMap(), que es una transformación de 1 a muchos, para convertir cada elemento (que representa un único registro) en una secuencia de elementos de destino (cada uno de los cuales representa una palabra).La transformación flatMap() devuelve un nuevo RDD aplicando primero una función (aquí, split(",")) a todos los elementos del RDD de origen y aplanando después los resultados:

    # Split each record into a list of words
    # records_lowercase: source RDD[String]
    # words: target RDD[String]
    words = records_lowercase.flatMap(lambda x: x.split(","))
  4. Por último, eliminamos los elementos de palabra con una longitud inferior o igual a 2. La siguiente transformación filter() elimina las palabras no deseadas, manteniendo sólo las que tienen una longitud superior a 2:

    # Keep words with a length greater than 2
    # x denotes a word
    # words: source RDD[String]
    # filtered: target RDD[String]
    filtered = words.filter(lambda x: len(x) > 2)

Como puedes observar, las transformaciones de Spark son de alto nivel, potentes y sencillas. Spark es por naturaleza distribuido y paralelo: tus datos de entrada están particionados y pueden ser procesados por transformaciones (como mapeadores, filtros y reductores) en paralelo en un entorno de clúster. En pocas palabras, para resolver un problema de análisis de datos en PySpark, lees los datos y los representas como un RDD o DataFrame (dependiendo de la naturaleza del formato de los datos), luego escribes un conjunto de transformaciones para convertir tus datos en la salida deseada. Spark particiona automáticamente tus DataFrames y RDDs y distribuye las particiones a través de diferentes nodos de clúster. Las particiones son las unidades básicas del paralelismo en Spark. El paralelismo es lo que permite a los desarrolladores realizar tareas en cientos de servidores informáticos de un clúster de forma paralela e independiente. Una partición en Spark es un trozo (una división lógica) de datos almacenados en un nodo del clúster. Los DataFrames y los RDD son colecciones de particiones. Spark tiene un particionador de datos por defecto para RDDs y DataFrames, pero puedes anular esa partición con tu propiaprogramación personalizada.

A continuación, vamos a profundizar un poco más en el ecosistema y la arquitectura de Spark.

El ecosistema Spark

El ecosistema de Spark se presenta enla Figura 1-2. Tiene tres componentes principales:

Entornos

Spark puede ejecutarse en cualquier lugar y se integra bien con otros entornos.

Aplicaciones

Spark se integra bien con diversas plataformas y aplicaciones de big data.

Fuentes de datos

Spark puede leer y escribir datos desde y hacia muchas fuentes de datos.

daws 0102
Figura 1-2. El ecosistema Spark (fuente: Databricks)

El amplio ecosistema de Spark hace de PySpark una gran herramienta para ETL, análisis de datos y muchas otras tareas. Con PySpark, puedes leer datos de muchas fuentes de datos diferentes (el sistema de archivos de Linux, Amazon S3, el Sistema de Archivos Distribuidos de Hadoop, tablas relacionales, MongoDB, Elasticsearch, archivos Parquet, etc.) y representarlos como una abstracción de datos Spark, como RDDs o DataFrames. Una vez que tus datos tengan esa forma, puedes utilizar una serie de transformaciones Spark sencillas y potentes para transformar los datos en la forma y el formato deseados. Por ejemplo, puedes utilizar la transformación filter() para eliminar los registros no deseados, utilizar groupByKey() para agrupar tus datos por la clave que desees y, por último, utilizar la transformación mapValues() para realizar la agregación final (como hallar la media, la mediana y la desviación estándar de los números) en los datos agrupados. Todas estas transformaciones son muy posibles utilizando la sencilla pero potente API de PySpark.

Arquitectura Spark

Cuando tienes datos pequeños, es posible analizarlos con un solo ordenador en un tiempo razonable. Cuando tienes grandes volúmenes de datos, utilizar un solo ordenador para analizar y procesar esos datos (y almacenarlos) puede ser prohibitivamente lento, o incluso imposible. Por eso queremos utilizar Spark.

Spark tiene una biblioteca central y un conjunto de bibliotecas incorporadas (SQL, GraphX, Streaming, MLlib), como se muestra en la Figura 1-3. Como puedes ver, a través de su API DataSource, Spark puede interactuar con muchas fuentes de datos, como Hadoop, HBase, Amazon S3, Elasticsearch y MySQL, por mencionar algunas.

daws 0103
Figura 1-3. Bibliotecas Spark

Esta figura muestra la verdadera potencia de Spark: puedes utilizar varios lenguajes diferentes para escribir tus aplicaciones Spark, y luego utilizar ricas bibliotecas para resolver diversos problemas de big data. Mientras tanto, puedes leer/escribir datos de diversas fuentes de datos.

Términos clave

Para comprender la arquitectura de Spark, necesitarás entender algunos términos clave:

SparkSession

La clase SparkSession, definida en el paquete pyspark.sql, es el punto de entrada para programar Spark con las APIs Dataset y DataFrame. Para hacer algo útil con un clúster Spark, primero tienes que crear una instancia de esta clase, que te da acceso a una instancia de SparkContext.

Nota

PySparkdispone de una completa API (compuesta por paquetes, módulos, clases y métodos) para acceder a la API de Spark. Es importante tener en cuenta que todas las API, paquetes, módulos, clases y métodos de Spark que se tratan en este libro son específicos de PySpark. Por ejemplo, cuando me refiero a la clase SparkContext me estoy refiriendo a la clase Python pyspark.SparkContext, definida en el paquete pyspark, y cuando me refiero a la clase SparkSession, me estoy refiriendo a la clase Python pyspark.sql.SparkSession, definida en el módulo pyspark.sql.

SparkContext

La clase SparkContext, definida en el paquete pyspark, es el principal punto de entrada a la funcionalidad Spark. Un SparkContextmantiene una conexión con el gestor del clúster Spark y puede utilizarse para crear RDDs y variables de difusión en el clúster. Cuando creas una instancia de SparkSession, la SparkContext pasa a estar disponible dentro de tu sesión como un atributo, SparkSession.sparkContext.

Conductor

Todas las aplicaciones Spark (incluido el shell PySpark y los programas Python independientes) se ejecutan como conjuntos independientes de procesos. Estos procesos están coordinados por un SparkContext en un programa controlador. Para enviar un programa Python independiente a Spark, tienes que escribir un programa controlador con la API PySpark (o Java o Scala). Este programa se encarga del proceso de ejecutar la función main() de la aplicación y de crear el SparkContext. También se puede utilizar para crear RDDs y DataFrames.

Trabajador

En un entorno de clúster Spark, hay dos tipos de nodos: uno (o dos, para alta disponibilidad) maestro y un conjunto de trabajadores. Un trabajador es cualquier nodo que puede ejecutar programas en el clúster. Si se lanza un proceso para una aplicación, esta aplicación adquiere ejecutores en los nodos worker, que se encargan de ejecutar las tareas Spark.

Gestor de clústeres

El nodo "maestro" se conoce como gestor del clúster. La función principal de este nodo es gestionar el entorno del clúster y los servidores que Spark aprovechará para ejecutar las tareas. El gestor de clúster asigna recursos a cada aplicación. Spark admite cinco tipos de gestores de clúster, dependiendo de dónde se esté ejecutando:

  1. Independiente (el propio entorno en clúster incorporado de Spark)

  2. Mesos (un núcleo de sistemas distribuidos)

  3. Hadoop YARN

  4. Kubernetes

  5. Amazon EC2

Nota

Aunque el uso de la terminología maestro/trabajador está pasado de moda y se está retirando en muchos contextos de software, sigue formando parte de la funcionalidad de Apache Spark, que es por lo que utilizo esta terminología en este libro.

La arquitectura Spark en pocas palabras

En la Figura 1-4 se presenta una vista de alto nivel de la arquitectura Spark. Informalmente, un clúster Spark se compone de un nodo maestro (el "gestor del clúster"), que se encarga de gestionar las aplicaciones Spark, y un conjunto de nodos "trabajadores" (ejecutores), que se encargan de ejecutar las tareas enviadas por las aplicaciones Spark (tus aplicaciones, que quieres ejecutar en el clúster Spark).

daws 0104
Figura 1-4. Arquitectura de Spark

Dependiendo del entorno en el que se esté ejecutando Spark, el gestor de clúster que gestione este clúster de servidores será el gestor de clúster independiente de Spark, Kubernetes, Hadoop YARN o Mesos. Cuando el clúster Spark está en funcionamiento, puedes enviar aplicaciones Spark al gestor de clúster, que concederá recursos a tu aplicación para que puedas completar tu análisis de datos.

Tu clúster puede tener uno, decenas, cientos o incluso miles de nodos trabajadores, dependiendo de las necesidades de tu empresa y de los requisitos de tu proyecto. Puedes ejecutar Spark en un servidor independiente, como un MacBook, un Linux o un PC con Windows, peronormalmente, para entornos de producción, Spark se ejecuta en un clúster de servidores Linux. Para ejecutar un programa Spark, necesitas tener acceso a un clúster Spark y disponer de un programa controlador, que declare las transformaciones y acciones sobre RDDs de datos y envíe dichas peticiones al gestor del clúster. En este libro, todos los programas controladores estarán en PySpark.

Cuando inicias una shell PySpark (ejecutando<spark-installed-dir>/bin/pyspark), se definen automáticamente dos variables/objetos:

spark

Una instancia de SparkSession, que es ideal para crear DataFrames

sc

Una instancia de SparkContext, que es ideal para crear RDDs

Si escribes una aplicación PySpark autónoma (un controlador Python, que utiliza la API PySpark), entonces tienes que crear explícitamente tú mismo una instancia deSparkSession. Puedes utilizar un SparkSessionpara:

  • Crear marcos de datos

  • Registrar DataFrames como tablas

  • Ejecutar SQL sobre tablas y tablas caché

  • Lee/escribe texto, CSV, JSON, Parquet y otros formatos de archivo

  • Leer/escribir tablas de bases de datos relacionales

PySpark define SparkSession como:

pyspark.sql.SparkSession (Python class, in pyspark.sql module)
class pyspark.sql.SparkSession(sparkContext,jsparkSession=None)
SparkSession: the entry point to programming Spark with the RDD
and DataFrame API.

Para crear un SparkSession en Python, utiliza el patrón constructor que se muestra aquí:

# import required Spark class
from pyspark.sql import SparkSession 1

# create an instance of SparkSession as spark
spark = SparkSession.builder \ 2
  .master("local") \
  .appName("my-application-name") \
  .config("spark.some.config.option", "some-value") \ 3
  .getOrCreate() 4

# to debug the SparkSession
print(spark.version) 5

# create a reference to SparkContext as sc
# SparkContext is used to create new RDDs
sc = spark.sparkContext 6

# to debug the SparkContext
print(sc)
1

Importa la clase SparkSession del módulo pyspark.sql.

2

Proporciona acceso a la API del Constructor utilizada para construir instancias de SparkSession.

3

Establece una opción de config. Las opciones establecidas mediante este método se propagan automáticamente tanto a SparkConf como a la propia configuración deSparkSession. Al crear un objeto SparkSession, puedes definir cualquier número deconfig(<key>, <value>) opciones.

4

Obtiene un SparkSession existente o, si no hay ninguno, crea uno nuevo basándose en las opciones establecidas aquí.

5

Sólo con fines de depuración.

6

Se puede hacer referencia a un SparkContext desde una instancia de SparkSession.

PySpark define SparkContext como:

class pyspark.SparkContext(master=None, appName=None, ...)
SparkContext: the main entry point for Spark functionality.
A SparkContext represents the connection to a Spark cluster,
and can be used to create RDD (the main data abstraction for
Spark) and broadcast variables (such as collections and data
structures) on that cluster.

SparkContext es el principal punto de entrada a la funcionalidad Spark. Un shell (como el shell de PySpark) o un programa controlador de PySpark no pueden crear más de una instancia de . Un representa la conexión a un clúster Spark, y puede utilizarse para crear nuevos RDD y variables de difusión (estructuras de datos compartidos y colecciones -una especie de variables globales de sólo lectura) en ese clúster. SparkContext SparkContextLa Figura 1-5 muestra cómo se puede utilizar un para crear un nuevo RDD a partir de un archivo de texto de entrada (etiquetado ) y luego transformarlo en otro RDD (etiquetado ) utilizando la transformación . Como puedes observar SparkContext records_rdd words_rdd flatMap() RDD.flatMap(f) devuelve un nuevo RDD aplicando primero una función (f) a todos los elementos del RDD de origen, y luego aplanando los resultados.

daws 0105
Figura 1-5. Creación de RDD por SparkContext

Para crear los objetos SparkSession y SparkContext, utiliza el siguiente patrón:

    # create an instance of SparkSession
    spark_session = SparkSession.builder.getOrCreate()

    # use the SparkSession to access the SparkContext
    spark_context = spark_session.sparkContext

Si vas a trabajar sólo con RDDs, puedes crear una instancia deSparkContext del siguiente modo:

    from pyspark import SparkContext
    spark_context = SparkContext("local", "myapp");

Ahora que ya conoces los fundamentos de Spark, vamos a profundizar un poco más en PySpark.

El poder de PySpark

PySpark es una API de Python para Apache Spark, diseñada para apoyar la colaboración entre Spark y el lenguaje de programación Python. La mayoría de los científicos de datos ya conocen Python, y PySpark les facilita la escritura de código corto y conciso para la computación distribuida mediante Spark. En pocas palabras, es un ecosistema todo en uno que puede gestionar requisitos de datos complejos gracias a su compatibilidad con RDD, DataFrames, GraphFrames, MLlib, SQL y mucho más.

Te mostraré la increíble potencia de PySpark con un sencillo ejemplo. Supongamos que tenemos montones de registros que contienen datos sobre las visitas a URL por parte de los usuarios (recogidos por un motor de búsqueda de muchos servidores web) con el siguiente formato:

<url_address><,><frequency>

Aquí tienes algunos ejemplos de cómo son estos registros:

http://mapreduce4hackers.com,19779
http://mapreduce4hackers.com,31230
http://mapreduce4hackers.com,15708
...
https://www.illumina.com,87000
https://www.illumina.com,58086
...

Supongamos que queremos encontrar la media, la mediana y la desviación típica de los números de visitas por clave (es decir, url_address). Otro requisito es que queremos descartar cualquier registro con una longitud inferior a 5 (ya que puede tratarse de URL malformadas). Es fácil expresar una solución elegante para esto en PySpark, como ilustra la Figura 1-6.

daws 0106
Figura 1-6. Flujo de trabajo sencillo para calcular la media, la mediana y la desviación típica

En primer lugar, vamos a crear algunas funciones básicas de Python que nos ayudarán a resolver nuestro sencillo problema. La primera función, create_pair(), acepta un único registro de la forma <url_address><,><frequency> y devuelve un par (clave, valor) (que nos permitirá hacer unGROUP BY sobre el campo clave más adelante), donde la clave es un url_address y el valor es el frequency asociado:

 # Create a pair of (url_address, frequency)
 # where url_address is a key and frequency is a value
 # record denotes a single element of RDD[String]
 # record: <url_address><,><frequency>
 def create_pair(record): 1
     tokens = record.split(',') 2
     url_address = tokens[0]
     frequency = tokens[1]
     return (url_address, frequency) 3
 #end-def
1

Acepta un registro de la forma <url_address><,><frequency>.

2

Tokeniza el registro de entrada, utilizando el url_address como clave (tokens[0]) y el frequency como valor (tokens[1]).

3

Devuelve un par de (url_address, frequency).

La siguiente función, compute_stats(), acepta una lista de frecuencias (como números) y calcula tres valores, la media, la mediana y la desviación típica:

 # Compute average, median, and standard
 # deviation for a given set of numbers
 import statistics 1
 # frequencies = [number1, number2, ...]
 def compute_stats(frequencies): 2
 	average = statistics.mean(frequencies) 3
 	median = statistics.median(frequencies) 4
 	standard_deviation = statistics.stdev(frequencies) 5
 	return (average, median, standard_deviation) 6
 #end-def
1

Este módulo proporciona funciones para calcular estadísticas matemáticas de datos numéricos.

2

Acepta una lista de frecuencias.

3

Calcula la media de las frecuencias.

4

Calcula la mediana de las frecuencias.

5

Calcula la desviación típica de las frecuencias.

6

Devuelve el resultado como una tripleta.

A continuación, te mostraré la asombrosa potencia de PySpark en unas pocas líneas de código, utilizando transformaciones Spark y nuestras funciones Python personalizadas:

# input_path = "s3://<bucket>/key"
input_path = "/tmp/myinput.txt"
results = spark 1
        .sparkContext 2
        .textFile(input_path) 3
        .filter(lambda record: len(record) > 5) 4
        .map(create_pair) 5
        .groupByKey() 6
        .mapValues(compute_stats) 7
1

spark denota una instancia de SparkSession, el punto de entrada a la programación de Spark.

2

sparkContext (un atributo de SparkSession) es el principal punto de entrada a la funcionalidad Spark.

3

Lee los datos como un conjunto distribuido de registros String(crea un RDD[String]).

4

Elimina los registros con una longitud inferior o igual a 5 (conserva los registros con una longitud superior a 5).

5

Crea pares (url_address, frequency) a partir de los registros de entrada.

6

Agrupa los datos por claves: cada clave (un url_address) se asociará a una lista de frecuencias.

7

Aplica la función compute_stats() a la lista de frecuencias.

El resultado será un conjunto de pares (clave, valor) de la forma

(url_address, (average, median, standard_deviation))

donde url-address es una clave y(average, median, standard_deviation)es un valor.

Nota

Lo más importante de Spark es que maximiza la concurrencia de funciones y operaciones mediante la partición de datos. Considera un ejemplo:

Si tus datos de entrada tienen 600.000 millones de filas y utilizas un clúster de 10 nodos, tus datos de entrada se particionarán en N ( > 1) trozos, que se procesan independientemente y en paralelo. Si N=20,000 (el número de trozos o particiones), entonces cada trozo tendrá unos 30 millones de registros/elementos (600.000.000.000 / 20.000 = 30.000.000). Si tienes un clúster grande, entonces los 20.000 trozos podrían procesarse de una sola vez. Si tienes un clúster más pequeño, puede que sólo se puedan procesar cada 100 trozos de forma independiente y en paralelo. Este proceso continuará hasta que se procesen los 20.000 trozos.

Arquitectura de PySpark

PySpark se construye sobre la API Java de Spark. Los datos se procesan en Python y se almacenan/barajan en caché en la Máquina Virtual Java, o JVM (trataré el concepto de barajar en el Capítulo 2). En la Figura 1-7 se presenta una vista de alto nivel de la arquitectura de PySpark.

daws 0107
Figura 1-7. Arquitectura de PySpark

Y el flujo de datos de PySpark se ilustra en la Figura 1-8.

daws 0108
Figura 1-8. Flujo de datos de PySpark

En el programa del controlador Python (tu aplicación Spark en Python), el SparkContextutiliza Py4Jpara lanzar una JVM, creando unJavaSparkContext. Py4J sólo se utiliza en el controlador para la comunicación local entre los objetos SparkContext de Python y Java; las grandes transferencias de datos se realizan a través de un mecanismo diferente. Las transformaciones RDD en Python se mapean a transformaciones sobre objetosPythonRDD en Java. En las máquinas de trabajo remotas, los objetos PythonRDD lanzan subprocesos Python y se comunican con ellos mediante tuberías, enviando el código del usuario y los datos que deben procesarse.

Nota

Py4J permite a los programas Python que se ejecutan en un intérprete de Python acceder dinámicamente a objetos Java en una JVM. Los métodos se invocan como si los objetos Java residieran en el intérprete de Python, y se puede acceder a las colecciones Java mediante métodos de colección estándar de Python. Py4J también permite a los programas Java devolver llamadas a objetos Python.

Abstracciones de datos Spark

Para manipular datos en el lenguaje de programación Python, utilizas enteros, cadenas, listas y diccionarios. Para manipular y analizar datos en Spark, tienes que representarlos como un conjunto de datos Spark. Spark admite tres tipos de abstracciones de conjuntos de datos:

  • RDD (conjunto de datos distribuidos resilientes):

    • API de bajo nivel

    • Denotado por RDD[T] (cada elemento tiene el tipo T)

  • DataFrame (similar a las tablas relacionales):

    • API de alto nivel

    • Denotado por Table(column_name_1, column_name_2, ...)

  • Conjunto de datos (similar a las tablas relacionales):

    • API de alto nivel (no disponible en PySpark)

La abstracción de datos Dataset se utiliza en lenguajes fuertemente tipados como Java y no está soportada en PySpark. Los RDDs y los DataFrames se tratarán en detalle en los siguientes capítulos, pero aquí haré una breve introducción.

Ejemplos de RDD

Esencialmente, un RDD representa tus datos como una colección de elementos. Es un conjunto inmutable de elementos distribuidos de tipo T, denotado como RDD[T].

La Tabla 1-1 muestra ejemplos de tres tipos sencillos de RDD:

RDD[Integer]

Cada elemento es un Integer.

RDD[String]

Cada elemento es un String.

RDD[(String, Integer)]

Cada elemento es un par de (String, Integer).

Tabla 1-1. RDD simples
RDD[Entero] RDD[Cadena] RDD[(Cadena, Entero)]

2

"abc"

('A', 4)

-730

"fox is red"

('B', 7)

320

"Python is cool"

('ZZ', 9)

...

...

...

La Tabla 1-2 es un ejemplo de RDD complejo. Cada elemento es un par (clave, valor), donde la clave es un String y el valor es una tripleta de (Integer, Integer, Double).

Tabla 1-2. RDD complejo
RDD[(String, (Integer, Integer, Double))]

("cat", (20, 40, 1.8))

("cat", (30, 10, 3.9))

("lion king", (27, 32, 4.5))

("python is fun", (2, 3, 0.6))

...

Operaciones Spark RDD

Los RDD de Spark son de sólo lectura, inmutables y distribuidos. Una vez creados, no pueden alterarse: no puedes añadir registros, borrar registros ni actualizar registros en un RDD. Sin embargo, se pueden transformar. Los RDD admiten dos tipos de operaciones: las transformaciones, que transforman los RDD fuente en uno o varios RDD nuevos, y las acciones, que transforman los RDD fuente en un objeto que no es un RDD, como un diccionario o una matriz. La relación entre RDDs, transformaciones y acciones se ilustra en la Figura 1-9.

daws 0109
Figura 1-9. RDDs, transformaciones y acciones

Entraremos en mucho más detalle sobre las transformaciones de Spark en los capítulos siguientes, con ejemplos prácticos que te ayudarán a comprenderlas, pero aquí te proporcionaré una breve introducción.

Transformaciones

Una transformación en Spark es una función que toma un RDD existente (el RDD de origen), le aplica una transformación y crea un nuevo RDD (el RDD de destino). Algunos ejemplos son: map(),flatMap(), groupByKey(), reduceByKey(), yfilter().

Informalmente, podemos expresar una transformación como:

transformation: source_RDD[V] --> target_RDD[T] 1
1

Transforma source_RDD del tipo V entarget_RDD del tipo T.

Los RDD no se evalúan hasta que se realiza una acción sobre ellos: esto significa que las transformaciones se evalúan perezosamente. Si un RDD falla durante una transformación, el linaje de datos de las transformaciones reconstruye el RDD.

La mayoría de las transformaciones Spark crean un único RDD, pero también es posible que creen varios RDD de destino. Los RDD de destino pueden ser más pequeños, más grandes o del mismo tamaño que el RDD de origen.

El siguiente ejemplo presenta una secuencia de transformaciones:

tuples = [('A', 7), ('A', 8), ('A', -4),
          ('B', 3), ('B', 9), ('B', -1),
          ('C', 1), ('C', 5)]
rdd = spark.sparkContext.parallelize(tuples)

# drop negative values
positives = rdd.filter(lambda x: x[1] > 0)
positives.collect()
[('A', 7), ('A', 8), ('B', 3), ('B', 9), ('C', 1), ('C', 5)]

# find sum and average per key using groupByKey()
sum_and_avg = positives.groupByKey()
    .mapValues(lambda v: (sum(v), float(sum(v))/len(v)))

# find sum and average per key using reduceByKey()
# 1. create (sum, count) per key
sum_count = positives.mapValues(lambda v: (v, 1))
# 2. aggregate (sum, count) per key
sum_count_agg = sum_count.reduceByKey(lambda x, y:
     (x[0]+y[0], x[1]+y[1]))
# 3. finalize sum and average per key
sum_and_avg = sum_count_agg.mapValues(
    lambda v: (v[0], float(v[0])/v[1]))
Consejo

La transformación groupByKey() agrupa los valores de cada clave del RDD en una única secuencia, similar a una sentencia SQL GROUP BY . Esta transformación puede provocar errores de falta de memoria (OOM), ya que los datos se envían a través de la red de servidores Spark y se recopilan en los reductores/trabajadores cuando el número de valores por clave es de miles o millones.

Con la transformación reduceByKey(), sin embargo, los datos se combinan en cada partición, por lo que sólo hay una salida para cada clave en cada partición que enviar a través de la red de servidores Spark. Esto la hace más escalable que groupByKey(). reduceByKey() combina los valores de cada clave utilizando una función de reducción asociativa y conmutativa. Combina todos los valores (por clave) en otro valor con exactamente el mismo tipo de datos (esto es una limitación, que puede superarse utilizando la transformacióncombineByKey() ). En general, reduceByKey() es más escalable que groupByKey(). Hablaremos más de estos temas en el Capítulo 4.

Acciones

Las acciones Spark son operaciones o funciones RDD que producen valores no RDD. Informalmente, podemos expresar una acción como:

action: RDD => non-RDD value

Las acciones pueden desencadenar la evaluación de RDDs (que, como recordarás, se evalúan perezosamente). Sin embargo, la salida de una acción es un valor tangible: un archivo guardado, un valor como un entero, un recuento de elementos, una lista de valores, un diccionario, etc.

Los siguientes son ejemplos de acciones:

reduce()

Aplica una función para obtener un único valor, como sumar valores para un determinado RDD[Integer]

collect()

Convierte un RDD[T] en una lista de tipo T

count()

Encuentra el número de elementos de un RDD dado

saveAsTextFile()

Guarda los elementos del RDD en un disco

saveAsMap()

Guarda los elementos de RDD[(K, V)] en un disco como archivodict[K, V]

Ejemplos de DataFrame

Similar a un RDD, un DataFrame en Spark es una colección distribuida inmutable de datos. Pero a diferencia de un RDD, los datos se organizan en columnas con nombre, como una tabla en una base de datos relacional. Esto se hace para facilitar el procesamiento de grandes conjuntos de datos. Los DataFrames permiten a los programadores imponer una estructura a una colección distribuida de datos, permitiendo una abstracción de mayor nivel. También hacen que el procesamiento de archivos CSV y JSON sea mucho más fácil que con los RDD.

El siguiente ejemplo de DataFrame tiene tres columnas:

DataFrame[name, age, salary]
name: String, age: Integer, salary: Integer

+-----+----+---------+
| name| age|   salary|
+-----+----+---------+
|  bob|  33|    45000|
| jeff|  44|    78000|
| mary|  40|    67000|
|  ...| ...|      ...|
+-----+----+---------+

Un DataFrame puede crearse a partir de muchas fuentes diferentes, como tablas Hive, Archivos de Datos Estructurados (SDF), bases de datos externas o RDD existentes. La API de DataFrames se diseñó para aplicaciones modernas de big data y ciencia de datos, inspirándose en DataFrames en R y pandas en Python. Como veremos en capítulos posteriores, podemos ejecutar consultas SQL contra DataFrames.

Spark SQL viene con un amplio conjunto de potentes operaciones DataFrame que incluye:

  • Funciones de agregación (mín, máx, suma, media, etc.)

  • Funciones de recogida

  • Funciones matemáticas

  • Funciones de clasificación

  • Funciones de cadena

  • Funciones definidas por el usuario (UDF)

Por ejemplo, puedes leer fácilmente un archivo CSV y crear un DataFrame a partir de él:

# define input path
virus_input_path = "s3://mybucket/projects/cases/case.csv"

# read CSV file and create a DataFrame
cases_dataframe = spark.read.load(virus_input_path,format="csv",
   sep=",", inferSchema="true", header="true")

# show the first 3 rows of created DataFrame
cases_dataframe.show(3)
+-------+-------+-----------+--------------+---------+
|case_id|country|       city|infection_case|confirmed|
+-------+-------+-----------+--------------+---------+
|  C0001|    USA|   New York|       contact|      175|
+-------+-------+-----------+--------------+---------+
|  C0008|    USA| New Jersey|       unknown|       25|
+-------+-------+-----------+--------------+---------+
|  C0009|    USA|  Cupertino|       contact|      100|
+-------+-------+-----------+--------------+---------+

Para ordenar los resultados por número de casos en orden descendente, podemos utilizar la función sort():

# We can do this using the F.desc function:
from pyspark.sql import functions as F
cases_dataframe.sort(F.desc("confirmed")).show()
+-------+-------+-----------+--------------+---------+
|case_id|country|       city|infection_case|confirmed|
+-------+-------+-----------+--------------+---------+
|  C0001|    USA|   New York|       contact|      175|
+-------+-------+-----------+--------------+---------+
|  C0009|    USA|  Cupertino|       contact|      100|
+-------+-------+-----------+--------------+---------+
|  C0008|    USA| New Jersey|       unknown|       25|
+-------+-------+-----------+--------------+---------+

También podemos filtrar filas fácilmente:

cases_dataframe.filter((cases_dataframe.confirmed > 100) &
                       (cases_dataframe.country == 'USA')).show()

+-------+-------+-----------+--------------+---------+
|case_id|country|       city|infection_case|confirmed|
+-------+-------+-----------+--------------+---------+
|  C0001|    USA|   New York|       contact|      175|
+-------+-------+-----------+--------------+---------+
...

Para que te hagas una mejor idea de la potencia de los DataFrames de Spark, veamos un ejemplo. Crearemos un DataFrame y hallaremos la media y la suma de horas trabajadas por los empleados por departamento:

# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, sum

# Create a DataFrame using SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()
dept_emps = [("Sales", "Barb", 40), ("Sales", "Dan", 20),
             ("IT", "Alex", 22), ("IT", "Jane", 24),
             ("HR", "Alex", 20), ("HR", "Mary", 30)]
df = spark.createDataFrame(dept_emps, ["dept", "name", "hours"])

# Group the same depts together, aggregate their hours, and compute an average
averages = df.groupBy("dept")
   .agg(avg("hours").alias('average'),
        sum("hours").alias('total'))

# Show the results of the final execution
averages.show()
+-----+--------+------+
| dept| average| total|
+-----+--------+------+
|Sales|    30.0|  60.0|
|   IT|    23.0|  46.0|
|   HR|    25.0|  50.0|
+-----+--------+------+

Como puedes ver, los DataFrames de Spark son lo suficientemente potentes como para manipular miles de millones de filas con funciones sencillas pero potentes.

Utilizar la Shell de PySpark

Hay dos formas principales de utilizar PySpark:

  • Utiliza el intérprete de comandos PySpark (para pruebas y programación interactiva).

  • Utiliza PySpark en una aplicación autónoma. En este caso, escribes un programa controlador Python (digamos,mi_programa_pyspark.py) utilizando la API PySpark y luego lo ejecutas con el comando spark-submit:

    export SUBMIT=$SPARK_HOME/bin/spark-submit
    $SUBMIT [options] my_pyspark_program.py <parameters>

    donde <parameters> es una lista de parámetros consumidos por tu programa PySpark(mi_programa_pyspark.py).

Nota

Para más detalles sobre el uso del comando spark-submit, consulta"Envío de solicitudes" en la documentación de Spark.

En esta sección nos centraremos en el shell interactivo de Spark para usuarios de Python, una potente herramienta que puedes utilizar para analizar datos de forma interactiva y ver los resultados inmediatamente (Spark también proporciona un shell Scala). La shell PySpark puede funcionar tanto en instalaciones de una sola máquina como en instalaciones en clúster de Spark. Utiliza el siguiente comando para iniciar el intérprete de comandos, donde SPARK_HOME indica el directorio de instalación de Spark en tu sistema:

export SPARK_HOME=<spark-installation-directory>
$SPARK_HOME/bin/pyspark

Por ejemplo:

export SPARK_HOME="/home/spark" 1
$SPARK_HOME/bin/pyspark 2
Python 3.7.2

Welcome to Spark version 3.1.2
Using Python version 3.7.2
SparkSession available as spark.
SparkContext available as sc
>>>
1

Define el directorio de instalación de Spark.

2

Invoca el intérprete de comandos PySpark.

Cuando inicias el intérprete de comandos, PySpark muestra alguna información útil, como detalles sobre las versiones de Python y Spark que está utilizando (ten en cuenta que aquí se ha acortado la salida). El símbolo >>> se utiliza como indicador del intérprete de comandos PySpark. Este indicador indica que ya puedes escribir comandos Python o PySpark y ver los resultados.

Para que te sientas cómodo con el intérprete de comandos PySpark, las siguientes secciones te guiarán a través de algunos ejemplos de uso básico.

Iniciar la Shell de PySpark

Para entrar en un intérprete de comandos PySpark, ejecutamos pyspark del siguiente modo:

$SPARK_HOME/bin/pyspark  1
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

SparkSession available as 'spark'.
SparkContext available as 'sc'.
>>> sc.version 2
'3.1.2'
>>> spark.version 3
'3.1.2'
1

Ejecutando pyspark se creará un nuevo intérprete de comandos. La salida aquí se ha acortado.

2

Comprueba que SparkContext se crea como sc.

3

Comprueba que SparkSession se crea como spark.

Una vez que entras en el shell de PySpark, se crea una instancia de SparkSession como variable spark y una instancia de SparkContext como variable sc. Como aprendiste anteriormente en este capítulo, el SparkSession es el punto de entrada a la programación de Spark con las APIs Dataset y DataFrame; un SparkSession puede utilizarse para crear DataFrames, registrar DataFrames como tablas, ejecutar SQL sobre tablas, almacenar tablas en caché y leer archivos CSV, JSON y Parquet. Si quieres utilizar PySpark en una aplicación autónoma, entonces tienes que crear explícitamente un SparkSession utilizando el patrón constructor que se muestra en "La arquitectura Spark en pocas palabras". Un SparkContext es el principal punto de entrada a la funcionalidad Spark; puede utilizarse para crear RDDs a partir de archivos de texto y colecciones Python. Lo veremos a continuación.

Crear un RDD a partir de una colección

Spark nos permite crear nuevos RDD a partir de archivos y colecciones (estructuras de datos como matrices y listas). Aquí, utilizamos SparkContext.parallelize() para crear un nuevo RDD a partir de una colección (representada como data):

>>> data = [ 1
    ("fox", 6), ("dog", 5), ("fox", 3), ("dog", 8),
    ("cat", 1), ("cat", 2), ("cat", 3), ("cat", 4)
]

>>># use SparkContext (sc) as given by the PySpark shell
>>># create an RDD as rdd
>>> rdd = sc.parallelize(data) 2
>>> rdd.collect() 3
[
 ('fox', 6), ('dog', 5), ('fox', 3), ('dog', 8),
 ('cat', 1), ('cat', 2), ('cat', 3), ('cat', 4)
]
>>> rdd.count() 4
8
1

Define tu colección Python.

2

Crea un nuevo RDD a partir de una colección Python.

3

Muestra el contenido del nuevo RDD.

4

Cuenta el número de elementos del RDD.

Agregar y fusionar valores de claves

La transformación reduceByKey() se utiliza para fusionar y agregar valores. En este ejemplo, x y yse refieren a los valores de la misma clave:

>>> sum_per_key = rdd.reduceByKey(lambda x, y : x+y) 1
>>> sum_per_key.collect() 2
[
 ('fox', 9),
 ('dog', 13),
 ('cat', 10)
]
1

Fusiona y agrega valores de la misma clave.

2

Recoge los elementos del RDD.

El RDD fuente de esta transformación debe estar formado por pares (clave, valor). reduceByKey() fusiona los valores de cada clave utilizando una función de reducción asociativa y conmutativa. También realizará la fusión localmente en cada mapeador antes de enviar los resultados a un reductor, de forma similar a un "combinador" en MapReduce. La salidase particionará con particiones numPartitions, o el nivel de paralelismo por defecto si no se especifica numPartitions. El particionador por defecto es HashPartitioner.

Si T es el tipo del valor de los pares (clave, valor), entonces reduceByKey()'s func() puede definirse como:

# source_rdd : RDD[(K, T)]
# target_rdd : RDD[(K, T)]
target_rdd = source_rdd.reduceByKey(lambda x, y: func(x, y))
# OR you may write it by passing the function name
# target_rdd = source_rdd.reduceByKey(func)
# where
#      func(T, T) -> T
# Then you may define `func()` in Python as:
# x: type of T
# y: type of T
def func(x, y):
  result = <aggregation of x and y: return a result of type T>
  return result
#end-def

Esto significa que:

  • Hay dos argumentos de entrada (del mismo tipo, T) para el reductor func().

  • El tipo de retorno de func() debe ser el mismo que el tipo de entrada T (esta limitación puede evitarse si utilizas la transformación combineByKey() ).

  • El reductor func() tiene que ser asociativo. Informalmente, una operación binaria f() sobre un conjunto T se denomina asociativa si cumple la ley asociativa, que establece que el orden en que se agrupan los números no cambia el resultado de la operación.

    Derecho Asociativo

    f(f(x, y), z) = f(x, f(y, z))

    Observa que la ley asociativa es válida para la suma (+) y la multiplicación (*), pero no para la resta (-) o la división (/).

  • El reductor func() tiene que ser conmutativo: informalmente, una función f() para la que f(x, y) = f(y, x) para todos los valores de x y y. Es decir, un cambio en el orden de los números no debe afectar al resultado de la operación.

    Ley conmutativa

    f(x, y) = f(y, x)

    La ley conmutativa también es válida para la suma y la multiplicación, pero no para la resta o la división. Por ejemplo:

    5 + 3 = 3 + 5 pero 5 - 3 ≠ 3 - 5

Por tanto, no puedes utilizar operaciones de resta o división en una transformación reduceByKey().

Filtrar los elementos de un RDD

A continuación, utilizaremos la transformación filter() para devolver un nuevo RDD que contenga sólo los elementos que satisfacen un predicado:

>>> sum_filtered = sum_per_key.filter(lambda x : x[1] > 9) 1
>>> sum_filtered.collect() 2
[
 ('cat', 10),
 ('dog', 13)
]
1

Conserva los pares (clave, valor) si el valor es mayor que 9.

2

Recoge los elementos del RDD.

Agrupar claves similares

Podemos utilizar la transformación groupByKey() para agrupar los valores de cada clave del RDD en una única secuencia:

>>> grouped = rdd.groupByKey() 1
>>> grouped.collect() 2
[
 ('fox', <ResultIterable object at 0x10f45c790>), 3
 ('dog', <ResultIterable object at 0x10f45c810>),
 ('cat', <ResultIterable object at 0x10f45cd90>)
]
>>>
>>># list(v) converts v as a ResultIterable into a list
>>> grouped.map(lambda (k,v) : (k, list(v))).collect()  4
[
 ('fox', [6, 3]),
 ('dog', [5, 8]),
 ('cat', [1, 2, 3, 4])
]
1

Agrupa elementos de la misma clave en una secuencia de elementos.

2

Ver el resultado.

3

El nombre completo de ResultIterable es pyspark.resultiterable.ResultIterable.

4

Primero aplica map() y luego collect(), que devuelve una lista que contiene todos los elementos del RDD resultante. La funciónlist() convierte ResultIterable en una lista de objetos.

El RDD fuente de esta transformación debe estar compuesto por pares (clave, valor). groupByKey() agrupa los valores de cada clave del RDD en una única secuencia, y divide en hash el RDD resultante con las particiones de numPartitions, o con el nivel de paralelismo por defecto si no se especifica numPartitions. Ten en cuenta que si estás agrupando (utilizando la transformacióngroupByKey() ) para realizar una agregación, como una suma o una media, sobre cada clave, utilizar reduceByKey()o aggregateByKey() proporcionará un rendimiento mucho mejor.

Agregar valores para claves similares

Para agregar y sumar los valores de cada clave, podemos utilizar la transformación mapValues() y la función sum():

>>> aggregated = grouped.mapValues(lambda values : sum(values)) 1
>>> aggregated.collect() 2
[
 ('fox', 9),
 ('dog', 13),
 ('cat', 10)
]
1

values es una secuencia de valores por clave. Pasamos cada valor del par RDD (clave, valor) a través de una función de mapeo (añadiendo todos los valuescon sum(values)) sin cambiar las claves.

2

Para la depuración, devolvemos una lista que contiene todos los elementos de este RDD.

Tenemos varias opciones para agregar y sumar valores: reduceByKey() y groupByKey(), por mencionar algunas. En general, la transformaciónreduceByKey() es más eficaz que la transformación groupByKey(). En el capítulo 4 encontrarás más detalles al respecto.

Como verás en los siguientes capítulos, Spark dispone de muchas otras potentes transformaciones que pueden convertir un RDD en un nuevo RDD. Como ya se ha mencionado, los RDD son de sólo lectura, inmutables y distribuidos. Las transformaciones de RDD devuelven un puntero a un nuevo RDD y te permiten crear dependencias entre RDDs. Cada RDD de la cadena de dependencias (o cadena de dependencias) tiene una función para calcular sus datos y un puntero (dependencia) a su RDD padre.

Herramientas de análisis de datos para PySpark

Jupyter

Jupyter es una gran herramienta para probar y crear prototipos de programas. PySpark también puede utilizarse desde los cuadernos Jupyter; es muy práctico para el análisis exploratorio de datos.

Apache Zeppelin

Zeppelin es un bloc de notas basado en web que permite realizar análisis de datos interactivos y documentos colaborativos con SQL, Python, Scala, etc.

Ejemplo de ETL con DataFrames

En el análisis y la informática de datos, ETL es el procedimiento general de copiar datos de una o varias fuentes en un sistema de destino que representa los datos de forma diferente a la fuente o fuentes o en un contexto diferente al de la fuente o fuentes. Aquí mostraré cómo Spark hace posible y fácil la ETL.

Para este ejemplo ETL, utilizaré los datos del censo de 2010 en formato JSON(censo_2010.json):

$ wc -l census_2010.json
101 census_2010.json

$ head -5 census_2010.json
{"females": 1994141, "males": 2085528, "age": 0, "year": 2010}
{"females": 1997991, "males": 2087350, "age": 1, "year": 2010}
{"females": 2000746, "males": 2088549, "age": 2, "year": 2010}
{"females": 2002756, "males": 2089465, "age": 3, "year": 2010}
{"females": 2004366, "males": 2090436, "age": 4, "year": 2010}
Nota

Estos datos se extrajeron de los datos de la Oficina del Censo de EE.UU., que en el momento de escribir este libro sólo proporciona las opciones binarias de hombre y mujer. Nos esforzamos por ser lo más inclusivos posible, y esperamos que en el futuro conjuntos de datos nacionales como éstos proporcionen opciones más inclusivas.

Definamos nuestro proceso ETL:

Extracción

Primero, creamos un DataFrame a partir de un documento JSON dado.

Transformación

A continuación, filtramos los datos y nos quedamos con los registros de mayores (age > 54). A continuación, añadimos una nueva columna, total, que es el total de hombres y mujeres.

Cargando

Por último, escribimos el DataFrame revisado en una base de datos MySQL y verificamos el proceso de carga.

Profundicemos un poco más en este proceso.

Extracción

Para realizar una extracción adecuada, primero tenemos que crear una instancia de la clase SparkSession:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("ETL") \
    .getOrCreate()

A continuación, leemos el JSON y creamos un DataFrame:

>>> input_path = "census_2010.json"
>>> census_df = spark.read.json(input_path)
>>> census_df.count()
101
>>> census_df.show(200)
+---+-------+-------+----+
|age|females|  males|year|
+---+-------+-------+----+
|  0|1994141|2085528|2010|
|  1|1997991|2087350|2010|
|  2|2000746|2088549|2010|
...
| 54|2221350|2121536|2010|
| 55|2167706|2059204|2010|
| 56|2106460|1989505|2010|
...
| 98|  35778|   8321|2010|
| 99|  25673|   4612|2010|
+---+-------+-------+----+
only showing top 100 rows

Transformación

La transformación puede implicar muchos procesos cuya finalidad es limpiar, formatear o realizar cálculos sobre los datos para adaptarlos a tus necesidades. Por ejemplo, puedes eliminar datos que falten o estén duplicados, unir columnas para crear otras nuevas o filtrar determinadas filas o columnas. Una vez que hemos creado el Marco de Datos mediante el proceso de extracción, podemos realizar muchas transformaciones útiles, como seleccionar sólo los mayores:

>>> seniors = census_df[census_df['age'] > 54]
>>> seniors.count()
46
>>> seniors.show(200)
+---+-------+-------+----+
|age|females|  males|year|
+---+-------+-------+----+
| 55|2167706|2059204|2010|
| 56|2106460|1989505|2010|
| 57|2048896|1924113|2010|
...
| 98|  35778|   8321|2010|
| 99|  25673|   4612|2010|
|100|  51007|   9506|2010|
+---+-------+-------+----+

A continuación, creamos una nueva columna agregada llamada total, que suma los números de hombres y mujeres:

>>> from pyspark.sql.functions import lit
>>> seniors_final = seniors.withColumn('total',
  lit(seniors.males + seniors.females))
>>> seniors_final.show(200)
+---+-------+-------+----+-------+
|age|females|  males|year|  total|
+---+-------+-------+----+-------+
| 55|2167706|2059204|2010|4226910|
| 56|2106460|1989505|2010|4095965|
| 57|2048896|1924113|2010|3973009|
...
| 98|  35778|   8321|2010|  44099|
| 99|  25673|   4612|2010|  30285|
|100|  51007|   9506|2010|  60513|
+---+-------+-------+----+-------+

Cargando

El proceso de carga implica guardar o escribir el resultado final del paso de transformación. Aquí escribiremos el DataFrame seniors_final en una tabla MySQL:

seniors_final\
  .write\
  .format("jdbc")\
  .option("driver", "com.mysql.jdbc.Driver")\
  .mode("overwrite")\
  .option("url", "jdbc:mysql://localhost/testdb")\
  .option("dbtable", "seniors")\
  .option("user", "root")\
  .option("password", "root_password")\
  .save()

El último paso de la carga es verificar el proceso de carga:

$ mysql -uroot -p
Enter password: <password>
Your MySQL connection id is 9
Server version: 5.7.30 MySQL Community Server (GPL)

mysql> use testdb;
Database changed
mysql> select * from seniors;
+------+---------+---------+------+---------+
| age  | females | males   | year | total   |
+------+---------+---------+------+---------+
|   55 | 2167706 | 2059204 | 2010 | 4226910 |
|   56 | 2106460 | 1989505 | 2010 | 4095965 |
|   57 | 2048896 | 1924113 | 2010 | 3973009 |
...
|   98 |   35778 |    8321 | 2010 |   44099 |
|   99 |   25673 |    4612 | 2010 |   30285 |
|  100 |   51007 |    9506 | 2010 |   60513 |
+------+---------+---------+------+---------+
46 rows in set (0.00 sec)

Resumen

Recapitulemos algunos puntos clave del capítulo:

  • Spark es un motor de análisis unificado rápido y potente (hasta cien veces más rápido que el tradicional Hadoop MapReduce) gracias a su funcionamiento en memoria, y ofrece abstracciones de datos robustas, distribuidas y tolerantes a fallos (denominadas RDD y DataFrames). Spark se integra con el mundo del aprendizaje automático y el análisis de grafos a través de los paquetes MLlib (biblioteca de aprendizaje automático) y GraphX (biblioteca de grafos).

  • Puedes utilizar las transformaciones y acciones de Spark en cuatro lenguajes de programación: Java, Scala, R y Python. PySpark (la API de Python para Spark) puede utilizarse para resolver problemas de big data, transformando eficazmente tus datos en el resultado y formato deseados.

  • Los big data pueden representarse utilizando las abstracciones de datos de Spark (RDDs, DataFrames y Datasets, todos ellos conjuntos de datos distribuidos).

  • Puedes ejecutar PySpark desde el shell de PySpark (utilizando el comando pyspark desde una línea de comandos) para programar Spark de forma interactiva. Utilizando el shell PySpark, puedes crear y manipular RDDs y DataFrames.

  • Puedes enviar una aplicación PySpark autónoma a un clúster Spark utilizando el comando spark-submit; las aplicaciones autónomas que utilizan PySpark se implementan en entornos de producción.

  • Spark ofrece muchas transformaciones y acciones para resolver problemas de big data, y su rendimiento difiere (por ejemplo,reduceByKey() frente a groupByKey()y combineByKey() frente a groupByKey()).

El siguiente capítulo se sumerge en algunas transformaciones importantes de Spark.

Get Algoritmos de datos con Spark now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.