Capítulo 4. Visión general de la API estructurada

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

Esta parte del libro será una inmersión profunda en las API Estructuradas de Spark. Las API Estructuradas son una herramienta para manipular todo tipo de datos, desde archivos de registro no estructurados hasta archivos CSV semiestructurados y archivos Parquet altamente estructurados. Estas API hacen referencia a tres tipos básicos de API de recopilación distribuida:

  • Conjuntos de datos

  • Marcos de datos

  • Tablas y vistas SQL

Aunque son partes distintas del libro, la mayoría de las API Estructuradas se aplican tanto al cálculo por lotes como al streaming. Esto significa que, cuando trabajes con las API Estructuradas, debería ser sencillo pasar de la computación por lotes al streaming (o viceversa) sin apenas esfuerzo. Trataremos el streaming en detalle en la Parte V.

Las API estructuradas son la abstracción fundamental que utilizarás para escribir la mayoría de tus flujos de datos. Hasta ahora en este libro, hemos adoptado un enfoque basado en tutoriales, serpenteando por gran parte de lo que ofrece Spark. Esta parte ofrece una exploración más profunda. En este capítulo, presentaremos los conceptos fundamentales que debes comprender: las API tipadas y no tipadas (y sus diferencias); cuál es la terminología básica; y, por último, cómo Spark toma realmente tus flujos de datos de la API estructurada y los ejecuta en el clúster. A continuación, proporcionaremos información más específica basada en tareas para trabajar con determinados tipos de datos o fuentes de datos.

Nota

Antes de proceder a, repasemos los conceptos y definiciones fundamentales que hemos tratado en la Parte I. Spark es un modelo de programación distribuida en el que el usuario especifica transformaciones. Múltiples transformaciones construyen un grafo acíclico dirigido de instrucciones. Una acción inicia el proceso de ejecución de ese grafo de instrucciones, como un único trabajo, descomponiéndolo en etapas y tareas que se ejecutarán en todo el clúster. Las estructuras lógicas que manipulamos con transformaciones y acciones son los DataFrames y los Datasets. Para crear un nuevo DataFrame o Dataset, se llama a una transformación. Para iniciar un cálculo o convertir a tipos de lenguaje nativo, llamas a una acción.

Marcos de datos y conjuntos de datos

En la Parte I se habló de DataFrames. Spark tiene dos nociones de colecciones estructuradas: DataFrames y Datasets. En breve abordaremos las diferencias (matizadas), pero definamos primero lo que ambos representan.

Los DataFrames y los Datasets son colecciones (distribuidas) similares a tablas con filas y columnas bien definidas. Cada columna debe tener el mismo número de filas que todas las demás columnas (aunque puedes utilizar null para especificar la ausencia de un valor) y cada columna tiene información de tipo que debe ser coherente para cada fila de la colección. Para Spark, los DataFrames y los Datasets representan planes inmutables, perezosamente evaluados, que especifican qué operaciones aplicar a los datos que residen en una ubicación para generar algún resultado. Cuando realizamos una acción en un DataFrame, ordenamos a Spark que realice las transformaciones reales y devuelva el resultado. Representan planes de cómo manipular filas y columnas para calcular el resultado deseado por el usuario.

Nota

Las tablas y las vistas son básicamente lo mismo que los DataFrames. Simplemente ejecutamos SQL contra ellas en lugar de código DataFrame. Cubrimos todo esto en el Capítulo 10, que se centra específicamente en Spark SQL.

Para añadir un poco más de especificidad a estas definiciones, tenemos que hablar de esquemas, que son la forma en que defines los tipos de datos que vas a almacenar en esta colección distribuida.

Esquemas

Un esquema define los nombres y tipos de columna de un Marco de datos. Puedes definir esquemas manualmente o leer un esquema de una fuente de datos (lo que a menudo se denomina esquema en lectura). Los esquemas se componen de tipos, lo que significa que necesitas una forma de especificar qué se encuentra dónde.

Visión general de los tipos de Spark estructurado

Spark es efectivamente un lenguaje de programación propio. Internamente, Spark utiliza un motor llamado Catalyst que mantiene su propia información de tipos a través de la planificación y el procesamiento del trabajo. Al hacerlo, abre una amplia variedad de optimizaciones de ejecución que marcan diferencias significativas. Los tipos de Spark se mapean directamente a las diferentes API de lenguaje que mantiene Spark y existe una tabla de consulta para cada una de ellas en Scala, Java, Python, SQL y R. Incluso si utilizamos las API estructuradas de Spark desde Python o R, la mayoría de nuestras manipulaciones operarán estrictamente sobre tipos de Spark, no sobre tipos de Python. Por ejemplo, el siguiente código no realiza la suma en Scala o Python; en realidad realiza la suma puramente en Spark:

// in Scala
val df = spark.range(500).toDF("number")
df.select(df.col("number") + 10)
# in Python
df = spark.range(500).toDF("number")
df.select(df["number"] + 10)

Esta operación de adición se produce porque Spark convertirá una expresión escrita en un lenguaje de entrada en la representación Catalyst interna de Spark de esa misma información de tipo. A continuación, operará sobre esa representación interna. Enseguida veremos por qué ocurre esto, pero antes tenemos que hablar de los Conjuntos de Datos.

Marcos de datos frente a conjuntos de datos

En esencia, dentro de las API Estructuradas, hay dos API más, los DataFrames "no tipados" y los Datasets "tipados". Decir que los DataFrames son no tipados es un poco inexacto; tienen tipos, pero Spark los mantiene completamente y sólo comprueba si esos tipos coinciden con los especificados en el esquema en tiempo de ejecución. Los Conjuntos de datos, en cambio, comprueban si los tipos se ajustan a la especificación en tiempo de compilación. Los conjuntos de datos sólo están disponibles para los lenguajes basados en la Máquina Virtual Java (JVM) (Scala y Java) y especificamos los tipos con clases case o beans Java.

Para lo más probable es que trabajes con DataFrames. Para Spark (en Scala), los DataFrames son simplemente conjuntos de datos de tipo Row. El tipo "Fila" es la representación interna de Spark de su formato optimizado en memoria para el cálculo. Este formato permite una computación altamente especializada y eficiente, ya que en lugar de utilizar tipos de la JVM, que pueden provocar elevados costes de recogida de basura e instanciación de objetos, Spark puede operar con su propio formato interno sin incurrir en ninguno de esos costes. Para Spark (en Python o R), no existen los Conjuntos de Datos: todo es un DataFrame y, por tanto, siempre operamos sobre ese formato optimizado.

Nota

El formato interno de Catalyst está bien tratado en numerosas presentaciones de Spark. Dado que este libro está destinado a un público más general, nos abstendremos de entrar en la implementación. Si tienes curiosidad, hay algunas charlas excelentes de Josh Rosen y Herman van Hovell, ambos de Databricks, sobre su trabajo en el desarrollo del motor Catalyst de Spark.

Comprender los DataFrames, los Tipos y los Esquemas de Spark lleva algún tiempo digerirlos. Lo que necesitas saber es que cuando utilizas DataFrames, estás aprovechando el formato interno optimizado de Spark. Este formato aplica las mismas ganancias de eficiencia a todas las API de lenguaje de Spark. Si necesitas una comprobación estricta en tiempo de compilación, lee el Capítulo 11 para saber más sobre ello.

Pasemos a conceptos más amables y accesibles: columnas y filas.

Columnas

Las columnas representan un tipo simple como un entero o una cadena, un tipo complejo como una matriz o un mapa, o un valor nulo. Spark rastrea toda esta información de tipo por ti y ofrece una variedad de formas, con las que puedes transformar las columnas. Las columnas se tratan ampliamente en el Capítulo 5, pero en su mayor parte puedes pensar en los tipos de Spark Column como columnas de una tabla.

Filas

Una fila no es más que un registro de datos. Cada registro de un DataFrame debe ser del tipo Row, como podemos comprobar al recopilar los siguientes DataFrames. Podemos crear estas filas manualmente a partir de SQL, de Conjuntos de Datos Distribuidos Resistentes (RDD), de fuentes de datos o manualmente desde cero. Aquí, creamos una utilizando un rango:

// in Scala
spark.range(2).toDF().collect()
# in Python
spark.range(2).collect()

Ambas dan como resultado una matriz de objetos Row.

Tipos de chispa

Antes hemos mencionado en que Spark tiene un gran número de representaciones internas de tipos. Incluimos una práctica tabla de referencia en las siguientes páginas para que puedas consultar con mayor facilidad qué tipo, en tu lenguaje específico, se corresponde con el tipo en Spark.

Antes de que llegue a esas tablas, hablemos de cómo instanciamos, o declaramos, que una columna es de un tipo determinado.

Para trabajar con los tipos Scala correctos, utiliza lo siguiente:

import org.apache.spark.sql.types._
val b = ByteType

Para trabajar con los tipos Java correctos, debes utilizar los métodos de fábrica del siguiente paquete:

import org.apache.spark.sql.types.DataTypes;
ByteType x = DataTypes.ByteType;

Los tipos de Python tienen a veces ciertos requisitos, que puedes ver enumerados en la Tabla 4-1, al igual que Scala y Java, que puedes ver enumerados en las Tablas 4-2 y 4-3, respectivamente. Para trabajar con los tipos Python correctos, utiliza lo siguiente:

from pyspark.sql.types import *
b = ByteType()

Las tablas siguientes proporcionan la información detallada de los tipos para cada uno de los enlaces lingüísticos de Spark.

Tabla 4-1. Referencia de tipos en Python
Tipo de datos Tipo de valor en Python API para acceder o crear un tipo de datos

ByteType

int o long. Nota: Los números se convertirán a números enteros con signo de 1 byte en tiempo de ejecución. Asegúrate de que los números están dentro del intervalo de -128 a 127.

ByteType()

TipoCorto

int o long. Nota: Los números se convertirán a números enteros con signo de 2 bytes en tiempo de ejecución. Asegúrate de que los números están dentro del intervalo de -32768 a 32767.

TipoCorto()

TipoEntero

int o long. Nota: Python tiene una definición poco estricta de "entero". Los números demasiado grandes serán rechazados por Spark SQL si utilizas IntegerType(). Es una buena práctica utilizar LongType.

TipoEntero()

TipoLargo

largo. Nota: Los números se convertirán a números enteros con signo de 8 bytes en tiempo de ejecución. Asegúrate de que los números están dentro del rango de -9223372036854775808 a 9223372036854775807. En caso contrario, convierte los datos a decimal.Decimal y utiliza DecimalType.

TipoLargo()

TipoFlotante

float. Nota: Los números se convertirán a números de coma flotante de precisión simple de 4 bytes en tiempo de ejecución.

TipoFlot()

TipoDoble

float

TipoDoble()

TipoDecimal

decimal.Decimal

TipoDecimal()

TipoCadena

cadena

TipoCadena()

TipoBinario

bytearray

TipoBinario()

Tipo booleano

bool

BooleanType()

TimestampType

datetime.datetime

TimestampType()

TipoFecha

datetime.date

TipoFecha()

TipoArray

lista, tupla o matriz

ArrayType(elementType, [containsNull]). Nota: El valor por defecto de containsNull es True.

Tipo de mapa

dic

MapType(tipoClave, tipoValor, [valorContieneNulo]). Nota: El valor por defecto de valorContieneNulo es Verdadero.

TipoEstructura

lista o tupla

TipoEstructura(campos). Nota: fields es una lista de StructFields. Además, no se permiten campos con el mismo nombre.

StructField

El tipo de valor en Python del tipo de datos de este campo (por ejemplo, Int para un StructField con el tipo de datos IntegerType)

StructField(name, dataType, [nullable]) Nota: El valor por defecto de nullable es True.

Tabla 4-2. Referencia de tipos Scala
Tipo de datos Tipo de valor en Scala API para acceder o crear un tipo de datos

ByteType

Byte

ByteType

TipoCorto

Corto

TipoCorto

TipoEntero

Int

TipoEntero

TipoLargo

Largo

TipoLargo

TipoFlotante

Flotador

TipoFlotante

TipoDoble

Doble

TipoDoble

TipoDecimal

java.math.BigDecimal

TipoDecimal

TipoCadena

Cadena

TipoCadena

TipoBinario

Matriz[Byte]

TipoBinario

Tipo booleano

Booleano

Tipo booleano

TimestampType

java.sql.Timestamp

TimestampType

TipoFecha

java.sql.Fecha

TipoFecha

TipoArray

scala.collection.Seq

ArrayType(elementType, [containsNull]). Nota: El valor por defecto de containsNull es verdadero.

Tipo de mapa

scala.collection.Mapa

MapType(tipoClave, tipoValor, [valorContieneNulo]). Nota: El valor por defecto de valorContieneNulo es verdadero.

TipoEstructura

org.apache.spark.sql.Fila

TipoEstructura(campos). Nota: fields es una matriz de StructFields. Además, no se permiten campos con el mismo nombre.

StructField

El tipo de valor en Scala del tipo de datos de este campo (por ejemplo, Int para un StructField con el tipo de datos IntegerType)

StructField(nombre, tipoDatos, [anulable]). Nota: El valor por defecto de nullable es true.

Tabla 4-3. Referencia de tipos Java
Tipo de datos Tipo de valor en Java API para acceder o crear un tipo de datos

ByteType

byte o Byte

TiposDatos.TipoByte

TipoCorto

corto o Corto

TiposDatos.TipoCorto

TipoEntero

int o Entero

TiposDatos.TipoEntero

TipoLargo

largo o Long

TiposDatos.TipoLargo

TipoFlotante

flotador o Flotador

TiposDatos.TipoFlotante

TipoDoble

doble o Doble

TiposDatos.DobleTipo

TipoDecimal

java.math.BigDecimal

TiposDatos.crearTipoDecimal() TiposDatos.crearTipoDecimal(precisión, escala).

TipoCadena

Cadena

TiposDatos.TipoCadena

TipoBinario

byte[]

TiposDatos.TipoBinario

Tipo booleano

booleano o booleana

TiposDatos.BooleanType

TimestampType

java.sql.Timestamp

TiposDatos.TimestampType

TipoFecha

java.sql.Fecha

TiposDatos.TipoFecha

TipoArray

java.util.Lista

DataTypes.createArrayType(elementType). Nota: El valor de containsNull será verdadero DataTypes.createArrayType(elementType, containsNull).

Tipo de mapa

java.util.Map

TiposDatos.createMapType(tipoClave, tipoValor). Nota: El valor de valorContieneNulo será verdadero. TiposDatos.crearTipoMapa(tipoClave, tipoValor, valorContieneNulo)

TipoEstructura

org.apache.spark.sql.Fila

TiposDatos.crearTipoEstructura(campos). Nota: fields es una Lista o una matriz de StructFields. Además, no se permiten dos campos con el mismo nombre.

StructField

El tipo de valor en Java del tipo de datos de este campo (por ejemplo, int para un StructField con el tipo de datos IntegerType)

DataTypes.createStructField(nombre, dataType, nullable)

Merece la pena tener en cuenta que los tipos podrían cambiar con el tiempo a medida que Spark SQL siga creciendo, por lo que es posible que quieras consultar la documentación de Spark para futuras actualizaciones. Por supuesto, todos estos tipos son geniales, pero casi nunca trabajas con DataFrames puramente estáticos. Siempre los manipularás y transformarás. Por eso es importante que te demos una visión general del proceso de ejecución en las API Estructuradas.

Visión general de la ejecución estructurada de la API

Esta sección de demostrará cómo se ejecuta realmente este código en un clúster. Esto te ayudará a entender (y potencialmente a depurar) el proceso de escribir y ejecutar código en clusters, así que vamos a recorrer la ejecución de una única consulta estructurada a la API desde el código de usuario hasta el código ejecutado. He aquí un resumen de los pasos:

  1. Escribir código DataFrame/Dataset/SQL.

  2. Si el código es válido, Spark lo convierte en un Plan Lógico.

  3. Spark transforma este Plan Lógico en un Plan Físico, comprobando las optimizaciones por el camino.

  4. A continuación, Spark ejecuta este Plan Físico (manipulaciones RDD) en el clúster.

Para ejecutar código, debemos escribir código. A continuación, este código se envía a Spark a través de la consola o mediante un trabajo enviado. A continuación, este código pasa por el Optimizador Catalyst, que decide cómo debe ejecutarse el código y establece un plan para hacerlo antes de que, finalmente, se ejecute el código y se devuelva el resultado al usuario. La Figura 4-1 muestra el proceso.

image
Figura 4-1. El Optimizador de Catalizadores

Planificación lógica

La primera fase de ejecución de tiene por objeto tomar el código del usuario y convertirlo en un plan lógico. La Figura 4-2 ilustra este proceso.

image
Figura 4-2. El proceso estructurado de planificación lógica de la API

Este plan lógico de sólo representa un conjunto de transformaciones abstractas que no hacen referencia a ejecutores o controladores, sino que se limita a convertir el conjunto de expresiones del usuario en la versión más optimizada. Para ello, convierte el código del usuario en un plan lógico no resuelto. Este plan está sin resolver porque, aunque tu código sea válido, las tablas o columnas a las que hace referencia pueden existir o no. Spark utiliza el catálogo, un repositorio de toda la información sobre tablas y DataFrame, para resolver columnas y tablas en el analizador. El analizador podría rechazar el plan lógico no resuelto si el nombre de la tabla o columna requerida no existe en el catálogo. Si el analizador puede resolverlo, el resultado se pasa por el Optimizador Catalyst, una colección de reglas que intentan optimizar el plan lógico empujando hacia abajo predicados o selecciones. Los paquetes pueden ampliar el Catalizador para incluir sus propias reglas para optimizaciones específicas del dominio.

Planificación física

Después de que cree con éxito un plan lógico optimizado, Spark inicia entonces el proceso de planificación física. El plan físico, a menudo llamado plan Spark, especifica cómo se ejecutará el plan lógico en el clúster generando diferentes estrategias de ejecución física y comparándolas mediante un modelo de costes, como se muestra en la Figura 4-3. Un ejemplo de la comparación de costes podría ser elegir cómo realizar una determinada unión fijándose en los atributos físicos de una tabla determinada (cómo de grande es la tabla o cómo de grandes son sus particiones).

image
Figura 4-3. El proceso de planificación física

La planificación física da como resultado una serie de RDD y transformaciones. Este resultado es la razón por la que puede que hayas oído referirse a Spark como un compilador: toma consultas en DataFrames, Datasets y SQL y las compila en transformaciones RDD por ti.

Ejecución

Al seleccionar un plan físico, Spark ejecuta todo este código sobre RDDs, la interfaz de programación de nivel inferior de Spark (que cubrimos en la Parte III). Spark realiza más optimizaciones en tiempo de ejecución, generando bytecode Java nativo que puede eliminar tareas o etapas enteras durante la ejecución. Finalmente, el resultado se devuelve al usuario.

Conclusión

En este capítulo, cubrimos las API Estructuradas de Spark y cómo Spark transforma tu código en lo que se ejecutará físicamente en el clúster. En los capítulos siguientes, cubrimos los conceptos básicos y cómo utilizar la funcionalidad clave de las API Estructuradas.

Get Spark: La Guía Definitiva now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.