Capítulo 4. Dask DataFrame
Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com
Los Pandas DataFrames, aunque populares, se encuentran rápidamente con limitaciones de memoria a medida que crece el tamaño de los datos, ya que almacenan la totalidad de los datos en memoria. Los Pandas DataFrames tienen una sólida API para todo tipo de manipulación de datos y suelen ser el punto de partida de muchos proyectos de análisis y aprendizaje automático. Aunque pandas en sí no tiene incorporado el aprendizaje automático, los científicos de datos suelen utilizarlo como parte de la preparación de datos y características durante la fase exploratoria de nuevos proyectos. Por ello, escalar los DataFrames de pandas para que puedan manejar grandes conjuntos de datos es de vital importancia para muchos científicos de datos. La mayoría de los científicos de datos ya están familiarizados con las bibliotecas pandas, y el DataFrame de Dask implementa gran parte de la API pandas añadiendo la capacidad de escalado.
Dask es uno de los primeros en implementar un subconjunto utilizable de las APIs de pandas, pero otros proyectos como Spark han añadido sus enfoques. Este capítulo asume que tienes un buen conocimiento de las APIs pandas DataFrame; si no es así, deberías consultar Python para el Análisis de Datos.
A menudo puedes utilizar Dask DataFrames como sustituto de pandas DataFrames con pequeños cambios, gracias a duck-typing. Sin embargo, este enfoque puede tener inconvenientes de rendimiento, y algunas funciones no están presentes. Estos inconvenientes provienen de la naturaleza paralela distribuida de Dask, que añade costes de comunicación para ciertos tipos de operaciones. En este capítulo, aprenderás a minimizar estos inconvenientes de rendimiento y a solucionar cualquier funcionalidad que falte.
Los DataFrames de Dask requieren que tus datos y tu cálculo se adapten bien a los DataFrames de pandas. Dask tiene bolsas para datos no estructurados, matrices para datos estructurados en matrices, la interfaz retardada de Dask para funciones arbitrarias y actores para operaciones con estado. Si ni siquiera a pequeña escala considerarías utilizar pandas para tu problema, probablemente los DataFrames de Dask no sean la solución adecuada.
Cómo se construyen los DataFrames Dask
Los DataFrames de Dask se construyen sobre de los DataFrames de pandas. Cada partición se almacena como un DataFrame de pandas.1 Utilizar pandas DataFrames para las particiones simplifica la implementación de gran parte de las APIs. Esto es especialmente cierto para las operaciones basadas en filas, en las que Dask pasa la llamada a la función a cada DataFrame de pandas.
La mayoría de los componentes distribuidos de Los DataFrames de Dask utilizan los tres bloques básicos map_partitions
, reduction
, y rolling
. La mayoría de las veces no necesitarás llamar directamente a estas funciones; en su lugar, utilizarás APIs de nivel superior. Pero comprender estas funciones y cómo funcionan es importante para entender cómo funciona Dask. shuffle
es un bloque de construcción crítico de los DataFrames distribuidos para reorganizar tus datos. A diferencia de los otros bloques de construcción, es posible que lo utilices directamente con más frecuencia, ya que Dask no puede abstraer el particionamiento.
Carga y escritura
El análisis de datos sólo es tan valioso como los datos a los que tiene acceso, y nuestras percepciones sólo son útiles si se traducen en acciones. Como no todos nuestros datos están en Dask, es esencial leer y escribir datos del resto del mundo. Hasta ahora, los ejemplos de este libro han utilizado principalmente colecciones locales, pero tienes muchas más opciones.
Dask admite la lectura y escritura de muchos formatos de archivo y sistemas de archivos estándar. Estos formatos incluyen CSV, HDF, ancho fijo, Parquet y ORC. Dask admite muchos de los sistemas de archivos distribuidos estándar, desde HDFS a S3, y la lectura desde sistemas de archivos normales.
Lo más importante para Dask es que los sistemas de archivos distribuidos permiten que varios ordenadores lean y escriban en el mismo conjunto de archivos. Los sistemas de archivos distribuidos suelen almacenar datos en varios ordenadores, lo que permite almacenar más datos de los que puede contener un solo ordenador. A menudo, aunque no siempre, los sistemas de archivos distribuidos también son tolerantes a fallos (lo que consiguen mediante la replicación). Los sistemas de archivos distribuidos pueden tener importantes diferencias de rendimiento respecto a aquellos con los que estás acostumbrado a trabajar, por lo que es importante que eches un vistazo a la documentación de usuario de los sistemas de archivos que utilices. Algunas cosas que debes tener en cuenta son el tamaño de los bloques (a menudo no querrás escribir archivos más pequeños que éstos, ya que el resto es espacio desperdiciado), la latencia y las garantías de coherencia.
Consejo
La lectura de archivos locales normales puede ser complicada en Dask, ya que los archivos deben existir en todos los trabajadores. Si un archivo sólo existe en el nodo principal, considera la posibilidad de copiarlo a un sistema de archivos distribuido como S3 o NFS, o cárgalo localmente y utiliza la función client.scatter
de Dask para distribuir los datos si son lo suficientemente pequeños. Los archivos suficientemente pequeños pueden ser una señal de que aún no necesitas Dask, a menos que el procesamiento en ellos sea complejo o lento.
Formatos
Las funciones de carga de DataFrame de Dask y de escritura de empiezan con to_
o read_
como prefijos. Cada formato tiene su propia configuración, pero en general, el primer argumento posicional es la ubicación de los datos que se van a leer. La ubicación puede ser una ruta comodín de archivos (por ejemplo, s3://test-bucket/magic/*), una lista de archivos o una ubicación de archivo normal.
Nota
Las rutas con comodines sólo funcionan con sistemas de archivos que admitan el listado de directorios. Por ejemplo, no funcionan en HTTP.
Al cargar datos, tener el número correcto de particiones acelerará todas tus operaciones. A veces no es posible cargar los datos con el número correcto de particiones, y en esos casos puedes volver a particionar tus datos después de la carga. Como ya hemos dicho, más particiones permiten más paralelismo, pero tienen una sobrecarga distinta de cero. Los distintos formatos tienen formas ligeramente diferentes de controlar esto. HDF toma chunksize
, que indica el número de filas por partición. Parquet también toma split_row_groups
, que toma un número entero de la partición lógica deseada del archivo Parquet, y Dask dividirá todo el conjunto en esos trozos, o menos. Si no se da, el comportamiento por defecto es que cada partición corresponda a un archivo Parquet. Los formatos basados en texto (CSV, ancho fijo, etc.) toman un parámetro blocksize
con el mismo significado que chunksize
de Parquet, pero con un valor máximo de 64 MB. Puedes comprobarlo cargando un conjunto de datos y viendo que el número de tareas y particiones aumenta con tamaños de destino más pequeños, como en el Ejemplo 4-1.
Ejemplo 4-1. Dask DataFrame cargando CSV con trozos de 1 KB
many_chunks
=
dd
.
read_csv
(
url
,
blocksize
=
"1kb"
)
many_chunks
.
index
Cargar archivos CSV y JSON puede ser más complicado que Parquet, y otros tipos de datos autodescriptivos no tienen codificada ninguna información de esquema. Los DataFrames de Dask necesitan conocer los tipos de las distintas columnas para serializar los datos correctamente. Por defecto, Dask mirará automáticamente los primeros registros y adivinará los tipos de datos de cada columna. Este proceso se conoce como inferencia de esquema, y puede ser bastante lento.
Por desgracia, la inferencia de esquemas no siempre funciona. Por ejemplo, si intentas cargar los datos de disparidad salarial entre hombres y mujeres del Reino Unido desde https://gender-pay-gap.service.gov.uk/viewing/download-data/2021, cuando accedas a los datos, como en el Ejemplo 4-2, obtendrás un error de "Se han encontrado dtypes no coincidentes en pd.read_csv
/pd.read_table
." Cuando la inferencia del tipo de columna de Dask es incorrecta, puedes anularla (por columna) especificando el parámetro dtype
, como se muestra en el Ejemplo 4-3.
Ejemplo 4-2. Dask DataFrame cargando CSV, dependiendo totalmente de la inferencia
df
=
dd
.
read_csv
(
"https://gender-pay-gap.service.gov.uk/viewing/download-data/2021"
)
Ejemplo 4-3. Dask DataFrame cargando CSV y especificando el tipo de datos
df
=
dd
.
read_csv
(
"https://gender-pay-gap.service.gov.uk/viewing/download-data/2021"
,
dtype
=
{
'CompanyNumber'
:
'str'
,
'DiffMeanHourlyPercent'
:
'float64'
})
Nota
En teoría, puedes hacer que Dask muestree más registros especificando más bytes con el parámetro sample
, pero esto no soluciona actualmente el problema. El código de muestreo actual no respeta estrictamente el número de bytes solicitados.
Incluso cuando la inferencia de esquemas no devuelve un error, depender de ella tiene una serie de inconvenientes. La inferencia de esquemas implica el muestreo de datos, por lo que sus resultados son probabilísticos y lentos. Siempre que puedas, debes utilizar formatos autodescriptivos o evitar la inferencia de esquemas; la carga de tus datos será más rápida y fiable. Algunos formatos autodescriptivos comunes que puedes encontrar son Parquet, Avro y ORC.
Leer y escribir desde/a nuevos formatos de archivo es mucho trabajo, sobre todo si no existen bibliotecas de Python. Si existen bibliotecas, puede que te resulte más fácil leer los datos en bruto en una bolsa y analizarlos con una función map
, que exploraremos más a fondo en el próximo capítulo.
Consejo
Dask no detecta los datos ordenados en la carga. En su lugar, si tienes datos preclasificados, añade el parámetro sorted=true
al establecer un índice para aprovechar tus datos ya clasificados, un paso que conocerás en la siguiente sección. Sin embargo, si especificas esto cuando los datos no están ordenados, puede que se produzca una corrupción silenciosa de los datos.
También puedes conectar Dask a bases de datos o microservicios . Las bases de datos relacionales son una herramienta fantástica y, a menudo, ofrecen un gran rendimiento en lecturas y escrituras sencillas. A menudo, las bases de datos relacionales admiten la implementación distribuida, en la que los datos se dividen en varios nodos, y esto se utiliza sobre todo con grandes conjuntos de datos. Las bases de datos relacionales suelen ser excelentes para gestionar transacciones a escala, pero ejecutar capacidades analíticas en el mismo nodo puede plantear problemas. Dask puede utilizarse para leer y calcular eficientemente sobre bases de datos SQL.
Puedes utilizar el soporte incorporado de Dask para cargar bases de datos SQL utilizando SQLAlchemy. Para que Dask divida la consulta en varias máquinas, tienes que darle una clave de índice. A menudo, las bases de datos SQL tendrán una clave primaria o una clave índice numérica que puedes utilizar para este fin (por ejemplo, read_sql_table("customers", index_col="customer_id")
). Un ejemplo de esto se muestra en el Ejemplo 4-4.
Ejemplo 4-4. Leer desde y escribir en SQL con Dask DataFrame
from
sqlite3
import
connect
from
sqlalchemy
import
sql
import
dask.dataframe
as
dd
#sqlite connection
db_conn
=
"sqlite://fake_school.sql"
db
=
connect
(
db_conn
)
col_student_num
=
sql
.
column
(
"student_number"
)
col_grade
=
sql
.
column
(
"grade"
)
tbl_transcript
=
sql
.
table
(
"transcripts"
)
select_statement
=
sql
.
select
([
col_student_num
,
col_grade
]
)
.
select_from
(
tbl_transcript
)
#read from sql db
ddf
=
dd
.
read_sql_query
(
select_stmt
,
npartitions
=
4
,
index_col
=
col_student_num
,
con
=
db_conn
)
#alternatively, read whole table
ddf
=
dd
.
read_sql_table
(
"transcripts"
,
db_conn
,
index_col
=
"student_number"
,
npartitions
=
4
)
#do_some_ETL...
#save to db
ddf
.
to_sql
(
"transcript_analytics"
,
uri
=
db_conn
,
if_exists
=
'replace'
,
schema
=
None
,
index
=
False
)
Las conexiones más avanzadas a bases de datos o microservicios se realizan mejor utilizando la interfaz de la bolsa y escribiendo tu código de carga personalizado, sobre el que aprenderás más en el próximo capítulo.
Sistemas de archivos
Cargar datos puede ser una cantidad sustancial de trabajo y un cuello de botella, por lo que Dask distribuye esto como la mayoría de las demás tareas. Si utilizas Dask distribuido, cada trabajador debe tener acceso a los archivos para paralelizar la carga. En lugar de copiar el archivo a cada trabajador, los sistemas de archivos de red permiten que todos accedan a los archivos. La capa de acceso a archivos de Dask utiliza la biblioteca FSSPEC (del proyecto intake) para acceder a los distintos sistemas de archivos. Como FSSPEC admite una serie de sistemas de archivos, no instala los requisitos de cada sistema de archivos admitido. Utiliza el código del Ejemplo 4-5 para ver qué sistemas de archivos son compatibles y cuáles necesitan paquetes adicionales.
Ejemplo 4-5. Obtener una lista de sistemas de archivos compatibles con FSSPEC
from
fsspec.registry
import
known_implementations
known_implementations
Muchos sistemas de archivos requieren algún tipo de configuración en , ya sea el punto final o las credenciales. A menudo, los nuevos sistemas de archivos, como MinIO, ofrecen API compatibles con S3, pero sobrecargan el punto final y requieren algún tipo de configuración adicional para funcionar. Con Dask especificas los parámetros de configuración a la función de lectura/escritura con storage_options
. La configuración de cada uno aquí será probablemente un poco diferente.2 Dask utilizará tustorage_options
dict como argumentos de palabra clave para la implementación subyacente de FSSPEC. Por ejemplo, mi storage_options
para MinIO se muestra en el Ejemplo 4-6.
Ejemplo 4-6. Configurar Dask para hablar con MinIO
minio_storage_options
=
{
"key"
:
"YOURACCESSKEY"
,
"secret"
:
"YOURSECRETKEY"
,
"client_kwargs"
:
{
"endpoint_url"
:
"http://minio-1602984784.minio.svc.cluster.local:9000"
,
"region_name"
:
'us-east-1'
},
"config_kwargs"
:
{
"s3"
:
{
"signature_version"
:
's3v4'
}},
}
Indexación
La indexación en un DataFrame es una de las potentes características de pandas, pero viene con algunas restricciones cuando se traslada a un sistema distribuido como Dask. Como Dask no hace un seguimiento del tamaño de cada partición, no admite la indexación posicional por filas. Puedes utilizar la indexación posicional por columnas, así como la indexación por etiquetas para columnas o filas.
La indexación se utiliza con frecuencia para filtrar los datos y tener sólo los componentes que necesitas. Hicimos esto para los datos COVID-19 de San Francisco mirando sólo las tasas de casos para personas de todos los estados de vacunación, como se muestra en el Ejemplo 4-7.
Ejemplo 4-7. Indexación Dask DataFrame
mini_sf_covid_df
=
(
sf_covid_df
[
sf_covid_df
[
'vaccination_status'
]
==
'All'
]
[[
'specimen_collection_date'
,
'new_cases'
]])
Si realmente necesitas indexación posicional por filas, puedes implementar la tuya propia calculando el tamaño de cada partición y utilizándolo para seleccionar los subconjuntos de particiones deseados. Esto es muy ineficiente, por lo que Dask evita implementarlo directamente; haz una elección intencionada antes de hacerlo.
Baraja
Como se mencionó en el capítulo anterior de , las barajadas son caras. Las causas principales de la naturaleza costosa de las barajadas son la sobrecarga de serialización al mover datos entre procesos y la lentitud comparativa de las redes en relación con la lectura de datos de la memoria. Estos costes aumentan a medida que aumenta la cantidad de datos que se barajan, por lo que Dask dispone de técnicas para reducir la cantidad de datos que se barajan. Estas técnicas dependen de determinadas propiedades de los datos o de la operación que se realice.
Ventanas rodantes y map_overlap
Una situación que puede desencadenar la necesidad de un barajado es una ventana rodante, en la que en los perímetros de una partición tu función necesita algunos registros de sus vecinos. Dask DataFrame tiene una función especial map_overlap
en la que puedes especificar una ventana de búsqueda posterior (también llamada ventana de búsqueda anterior ) y una ventana de búsqueda posterior (también llamada ventana de búsqueda posterior ) de filas a transferir (ya sea un entero o un delta de tiempo). El ejemplo más sencillo que aprovecha esta ventaja es una media móvil, que se muestra en el Ejemplo 4-8.
Ejemplo 4-8. Media móvil Dask DataFrame
def
process_overlap_window
(
df
):
return
df
.
rolling
(
'5D'
)
.
mean
()
rolling_avg
=
partitioned_df
.
map_overlap
(
process_overlap_window
,
pd
.
Timedelta
(
'5D'
),
0
)
Utilizar map_overlap
permite a Dask transferir sólo los datos necesarios. Para que esta implementación funcione correctamente, el tamaño mínimo de tu partición debe ser mayor que el de tu ventana más grande.
Advertencia
Las ventanas móviles de Dask no atraviesan varias particiones. Si tu DataFrame está particionado de forma que el look-after o look-back es mayor que la longitud de la partición del vecino, los resultados fallarán o serán incorrectos. Dask lo comprueba en el caso de los intervalos de tiempo delta, pero no lo hace en el caso de los intervalos de retorno o de los intervalos enteros.
Una técnica eficaz pero costosa para sortear el look-ahead/look-behind de una sola partición de Dask es repartition
, tus DataFrames de Dask.
Agregaciones
Las agregaciones son otro caso especial de que puede reducir la cantidad de datos que hay que transferir por la red. Las agregaciones son funciones que combinan registros. Si vienes de un entorno map/reduce o Spark, reduceByKey
es la agregación clásica. Las agregaciones pueden ser "por clave" o ser globales en todo un DataFrame.
Para agregar por clave, primero tienes que llamar a groupby
con la(s) columna(s) que representa(n) la clave, o la función de clave para agregar. Por ejemplo, llamar a df.groupby("PostCode")
agrupa tu DataFrame por código postal, o llamar a df.groupby(["PostCode", "SicCodes"])
utiliza una combinación de columnas para la agrupación. En cuanto a las funciones, están disponibles muchos de los mismos agregados de pandas, pero el rendimiento de los agregados en Dask es muy diferente del de los DataFrames locales de pandas.
Consejo
Si estás agregando por clave de partición, Dask puede calcular la agregación sin necesidad de barajar.
La primera forma de acelerar tus agregaciones es reducir las columnas sobre las que estás agregando, ya que los datos más rápidos de procesar son los que no existen. Por último, siempre que sea posible, realizar varias agregaciones al mismo tiempo reduce el número de veces que hay que barajar los mismos datos. Por tanto, si necesitas calcular la media y el máximo, debes calcular ambos al mismo tiempo (ver Ejemplo 4-9).
Ejemplo 4-9. Dask DataFrame máx y media
dask
.
compute
(
raw_grouped
[[
"new_cases"
]]
.
max
(),
raw_grouped
[[
"new_cases"
]]
.
mean
())
En sistemas distribuidos como Dask, si una agregación puede evaluarse parcialmente y luego fusionarse, puedes combinar potencialmente algunos registros antes de la fusión. No todas las agregaciones parciales son iguales. Lo que importa con las agregaciones parciales es que la cantidad de datos se reduce al fusionar valores con la misma clave en comparación con los múltiples valores originales.
Las agregaciones más eficientes ocupan una cantidad de espacio sublineal independientemente del número de registros. Algunas de ellas, como suma, recuento, primero, mínimo, máximo, media y desviación típica, pueden ocupar un espacio constante. Las tareas más complicadas, como cuantiles y recuentos distintos, también tienen opciones de aproximación sublineal. Estas opciones de aproximación pueden ser estupendas, ya que las respuestas exactas pueden requerir un crecimiento lineal del almacenamiento.3
Algunas funciones de agregación no tienen un crecimiento sublineal, sino que tienden a crecer o no demasiado rápido. Contar los valores distintos está en este grupo, pero si todos tus valores son únicos no hay ahorro de espacio.
Para aprovechar las ventajas de las agregaciones eficientes de , tienes que utilizar una agregación incorporada de Dask, o escribir la tuya propia utilizando la clase de agregación de Dask. Siempre que puedas, utiliza una incorporada. Las incorporadas no sólo requieren menos esfuerzo, sino que además suelen ser más rápidas. No todos los agregados de pandas están soportados directamente en Dask, así que a veces tu única opción es escribir tu propio agregado.
Si decides escribir tu propio agregado, tienes que definir tres funciones: chunk
para manejar cada grupo-partición/tramo, agg
para combinar los resultados de chunk
entre particiones, y (opcionalmente) finalize
para tomar el resultado de agg
y producir un valor final.
La forma más rápida de entender cómo utilizar la agregación parcial es viendo un ejemplo que utilice las tres funciones. Utilizar la media ponderada del Ejemplo 4-10 puede ayudarte a pensar en lo que se necesita para cada función. La primera función necesita calcular los valores ponderados y los pesos. La función agg
los combina sumando cada parte lateral de la tupla. Por último, la función finalize
divide el total entre las ponderaciones.
Ejemplo 4-10. Agregado personalizado Dask
# Write a custom weighted mean, we get either a DataFrameGroupBy
# with multiple columns or SeriesGroupBy for each chunk
def
process_chunk
(
chunk
):
def
weighted_func
(
df
):
return
(
df
[
"EmployerSize"
]
*
df
[
"DiffMeanHourlyPercent"
])
.
sum
()
return
(
chunk
.
apply
(
weighted_func
),
chunk
.
sum
()[
"EmployerSize"
])
def
agg
(
total
,
weights
):
return
(
total
.
sum
(),
weights
.
sum
())
def
finalize
(
total
,
weights
):
return
total
/
weights
weighted_mean
=
dd
.
Aggregation
(
name
=
'weighted_mean'
,
chunk
=
process_chunk
,
agg
=
agg
,
finalize
=
finalize
)
aggregated
=
(
df_diff_with_emp_size
.
groupby
(
"PostCode"
)
[
"EmployerSize"
,
"DiffMeanHourlyPercent"
]
.
agg
(
weighted_mean
))
En algunos casos, como con una suma pura, no necesitas hacer ningún postprocesamiento en la salida de agg
, por lo que puedes omitir la función finalize
.
No todas las agregaciones deben ser por clave; también puedes calcular agregaciones en todas las filas. Sin embargo, la interfaz de agregación personalizada de Dask sólo se expone con operaciones por clave.
Las agregaciones completas de DataFrame incorporadas en Dask utilizan una interfaz de nivel inferior llamada apply_contact_apply
para agregaciones parciales. En lugar de aprender dos API diferentes para las agregaciones parciales, preferimos hacer un groupby
estático proporcionando una función de agrupación constante. De este modo, sólo tenemos que conocer una interfaz para las agregaciones. Puedes utilizarla para encontrar los números COVID-19 agregados en todo el DataFrame, como se muestra en el Ejemplo 4-11.
Ejemplo 4-11. Agregar todo el Marco de datos
raw_grouped
=
sf_covid_df
.
groupby
(
lambda
x
:
0
)
Cuando existe una agregación incorporada, es probable que sea mejor que cualquier cosa que pudiéramos escribir. A veces una agregación parcial está parcialmente implementada, como en el caso del HyperLogLog de Dask: sólo está implementado para DataFrames completos. A menudo puedes traducir agregaciones sencillas utilizando apply_contact_apply
o aca
copiando la función chunk
, utilizando el parámetro combine
para agg
, y utilizando el parámetro aggregate
para finalize
. Esto se muestra portando la implementación de HyperLogLog de Dask en el Ejemplo 4-12.
Ejemplo 4-12. Envolver el HyperLogLog de Dask en dd.Aggregation
# Wrap Dask's hyperloglog in dd.Aggregation
from
dask.dataframe
import
hyperloglog
approx_unique
=
dd
.
Aggregation
(
name
=
'approx_unique'
,
chunk
=
hyperloglog
.
compute_hll_array
,
agg
=
hyperloglog
.
reduce_state
,
finalize
=
hyperloglog
.
estimate_count
)
aggregated
=
(
df_diff_with_emp_size
.
groupby
(
"PostCode"
)
[
"EmployerSize"
,
"DiffMeanHourlyPercent"
]
.
agg
(
weighted_mean
))
Las agregaciones lentas/ineficientes (o las que tienen muchas probabilidades de provocar una excepción por falta de memoria) utilizan un almacenamiento proporcional a los registros que se están agregando. Ejemplos de este grupo lento son hacer una lista y calcular ingenuamente cuantiles exactos.4
Con estos agregados lentos, utilizar la clase de agregación de Dask no tiene ninguna ventaja sobre la API apply
, que quizá quieras utilizar por simplicidad. Por ejemplo, si sólo quisieras una lista de ID de empleador por código postal, en lugar de tener que escribir tres funciones podrías utilizar una de una sola línea como df.groupby("PostCode")["EmployerId"].apply(lambda g: list(g))
. Dask implementa la función apply
como una barajada completa, que se trata en la siguiente sección.
Barajado completo y particionado
Si una operación parece ser más lenta en dentro de Dask de lo que esperarías trabajando en DataFrames locales, puede deberse a que requiere una barajada completa. Un ejemplo de esto es la ordenación, que es intrínsecamente cara en los sistemas distribuidos porque suele requerir una mezcla. Las barajadas completas son a veces una parte inevitable del trabajo en Dash. En contra de la intuición, aunque las barajadas completas son lentas en sí mismas, puedes utilizarlas para acelerar futuras operaciones que se realicen sobre la(s) misma(s) clave(s) de agrupación. Como se mencionó en la sección de agregación, una de las formas de desencadenar una barajada completa es utilizar el método apply
cuando la partición no está alineada.
Partición
Lo más habitual es que utilices barajados completos para reparticionar tus datos. Es importante tener la partición correcta cuando se trata de agregaciones, ventanas móviles o búsquedas/índices. Como se ha comentado en la sección de ventanas móviles, Dask no puede realizar más de una partición de búsqueda anticipada o retrospectiva, por lo que es necesario tener la partición correcta para obtener los resultados correctos. Para la mayoría de las demás operaciones, tener un particionado incorrecto ralentizará tu trabajo.
Dask dispone de tres métodos principales para controlar la partición de un DataFrame: set_index
, repartition
, y shuffle
(ver Tabla 4-1). Utiliza set_index
cuando cambie la partición a una nueva clave/índice. repartition
mantiene la misma clave/índice pero cambia las particiones. repartition
y set_index
toman parámetros similares, y repartition
no toma un nombre de clave de índice. En general, si no vas a cambiar la columna utilizada para el índice, debes utilizar repartition
. shuffle
es un poco diferente, ya que no produce un esquema de partición conocido que puedan aprovechar operaciones como groupby
.
Método | Cambia la clave del índice | Establece el número de particiones | Resultados en un esquema de partición conocido | Caso de uso ideal |
---|---|---|---|---|
|
Sí |
Sí |
Sí |
Cambiar la clave del índice |
|
No |
Sí |
Sí |
Aumentar/disminuir el número de particiones |
|
No |
Sí |
No |
Distribución sesgada de la clavea |
a Hace un hash de la clave para su distribución, lo que puede ayudar a distribuir aleatoriamente datos sesgados si las claves son únicas (pero agrupadas). |
El primer paso para conseguir la partición adecuada para tu Marco de Datos es decidir si quieres un índice. Los índices son útiles cuando se filtran datos por un valor indexado, se indexan, se agrupan y para casi cualquier otra operación por clave. Una de estas operaciones por clave sería una groupby
, en la que la columna sobre la que se agrupa podría ser una buena candidata para la clave. Si utilizas una ventana móvil sobre una columna, esa columna debe ser la clave, lo que hace que elegir la clave sea relativamente fácil. Una vez que te hayas decidido por un índice, puedes llamar a set_index
con el nombre de la columna del índice (por ejemplo, set_index("PostCode")
). Esto, en la mayoría de las circunstancias, dará lugar a un barajado, por lo que es un buen momento para dimensionar tus particiones.
Consejo
Si no estás seguro de cuál es la clave utilizada actualmente para particionar, puedes consultar la propiedad index
para ver la clave de partición.
Una vez elegida la clave, la siguiente cuestión es cómo dimensionar las particiones. El consejo de "Particionar/reunir colecciones" suele aplicarse aquí: intenta que haya suficientes particiones para mantener ocupada a cada máquina, pero ten en cuenta el punto óptimo general de 100 MB a 1 GB. Dask suele calcular particiones bastante uniformes si le das un número objetivo de particiones.5 Por suerte, set_index
también acepta npartitions
. Para repartir los datos por código postal, con 10 particiones, añadirías set_index("PostCode", npartitions=10)
; de lo contrario, Dask utilizará por defecto el número de particiones de entrada.
Si piensas utilizar ventanas móviles, probablemente tendrás que asegurarte de que tienes el tamaño adecuado (en términos de rango de claves) cubierto en cada partición. Para hacer esto como parte de set_index
, tendrías que calcular tus propias divisiones para asegurarte de que cada partición tiene el rango correcto de registros presentes. Las divisiones se especifican como una lista que empieza por el valor mínimo de la primera partición hasta el valor máximo de la última. Cada valor intermedio es un punto de "corte" entre los DataFrames pandas que componen el DataFrame Dask. Para hacer un DataFrame con las particiones [0, 100) [100, 200), [200, 300), [300, 500)
, escribirías df.set_index("NumEmployees", divisions=[0, 100, 200, 300, 500])
. Del mismo modo, para que el intervalo de fechas admita una ventana móvil de hasta siete días desde aproximadamente el inicio de la pandemia COVID-19 hasta hoy, consulta el Ejemplo 4-13.
Ejemplo 4-13. Ventana rodante DataFrame de Dask con set_index
divisions
=
pd
.
date_range
(
start
=
"2021-01-01"
,
end
=
datetime
.
today
(),
freq
=
'7D'
)
.
tolist
()
partitioned_df_as_part_of_set_index
=
mini_sf_covid_df
.
set_index
(
'specimen_collection_date'
,
divisions
=
divisions
)
Advertencia
Dask, incluso para las ventanas temporales rodantes, asume que tu índice de partición es monotónicamente creciente.6
Hasta ahora, has tenido que especificar el número de particiones o las divisiones concretas, pero quizá te preguntes si Dask puede averiguarlo por sí mismo. Afortunadamente, la función repartición de Dask tiene la capacidad de elegir divisiones para un tamaño objetivo determinado, como se muestra en el Ejemplo 4-14. Sin embargo, hacer esto tiene un coste no trivial, ya que Dask debe evaluar el DataFrame además de la propia repartición.
Ejemplo 4-14. Partición automática de Dask DataFrame
reparted
=
indexed
.
repartition
(
partition_size
=
"20kb"
)
Advertencia
set_index
de Dask tiene un parámetro similar partition_size
pero, en el momento de escribir esto, sólo funciona para reducir el número de particiones.
Como has visto al principio de este capítulo, al escribir un Marco de Datos, cada partición recibe su propio archivo, pero a veces esto puede dar lugar a archivos demasiado grandes o demasiado pequeños. Algunas herramientas sólo aceptan un archivo como entrada, por lo que tienes que reparticionar todo en una única partición. Otras veces, el sistema de almacenamiento de datos está optimizado para un determinado tamaño de archivo, como el tamaño de bloque por defecto de HDFS de 128 MB. La buena noticia es que las técnicas como repartition
y set_index
resuelven estos problemas por ti.
Operaciones embarazosamente paralelas
La función map_partitions
de Dask aplica una función a cada una de las particiones subyacentes a los DataFrames de pandas, y el resultado es también un DataFrame de pandas. Las funciones implementadas con map_partitions
son vergonzosamente paralelas, ya que no requieren ninguna transferencia de datos entre trabajadores.7
Dask implementa map
con map_partitions
, así como muchas operaciones por filas. Si quieres utilizar una operación por filas que te parezca que falta, puedes implementarla tú mismo, como se muestra en el Ejemplo 4-15.
Ejemplo 4-15. Marco de datos Dask fillna
def
fillna
(
df
):
return
df
.
fillna
(
value
=
{
"PostCode"
:
"UNKNOWN"
})
.
fillna
(
value
=
0
)
new_df
=
df
.
map_partitions
(
fillna
)
# Since there could be an NA in the index clear the partition / division
# information
new_df
.
clear_divisions
()
No estás limitado a llamar a las funciones integradas de pandas. Siempre que tu función tome y devuelva un DataFrame, puedes hacer prácticamente lo que quieras dentro de map_partitions
.
La API completa de pandas es demasiado larga para cubrirla en este capítulo, pero si una función puede operar fila por fila sin ningún conocimiento de las filas anteriores o posteriores, puede que ya esté implementada en Dask DataFrames utilizando map_partitions
.
Cuando utilices map_partitions
en un Marco de Datos, puedes cambiar cualquier cosa de cada fila, incluida la clave de partición. Si cambias los valores de la clave de partición, debes borrar la información de partición del Marco de datos resultante con clear_divisions()
o especificar la indexación correcta con set_index
, sobre la que aprenderás más en la siguiente sección.
Advertencia
Una información incorrecta sobre la partición puede dar lugar a resultados incorrectos, no sólo a excepciones, ya que Dask puede pasar por alto datos relevantes.
Trabajar con varios DataFrames
Pandas y Dask tienen cuatro funciones comunes para combinar DataFrames. En la raíz está la función concat
, que te permite unir DataFrames en cualquier eje. La concatenación de DataFrames suele ser más lenta en Dask, ya que implica comunicación entre trabajadores. Las otras tres funciones son join
, merge
, y append
, todas las cuales implementan casos especiales para situaciones comunes sobre concat
y tienen consideraciones de rendimiento ligeramente diferentes. Tener buenas divisiones/particiones, en términos de selección de claves y número de particiones, marca una gran diferencia cuando se trabaja con múltiples DataFrames.
Las funciones join
y merge
de Dask toman la mayoría de los argumentos estándar de pandas junto con uno adicional opcional, npartitions
. npartitions
especifica un número objetivo de particiones de salida, pero sólo se utiliza para uniones hash (de las que aprenderás en "Internos de Multi-DataFrame"). Ambas funciones reparticionan automáticamente tus Marcos de Datos de entrada si es necesario. Esto está muy bien, ya que puede que no conozcas el particionado, pero como el reparticionado puede ser lento, utilizar explícitamente la función de nivel inferior concat
cuando no esperes que se necesite ningún cambio en el particionado puede ayudar a detectar problemas de rendimiento con antelación. El join
de Dask puede tomar más de dos DataFrames a la vez sólo cuando se hace un tipo de join izquierdo o externo.
Consejo
Dask tiene una lógica especial para acelerar las uniones multi-DataFrame, por lo que en la mayoría de los casos, en lugar de hacer a.join(b).join(c).join(d).join(e)
, te beneficiarás de hacer a.join([b, c, d, e])
. Sin embargo, si estás realizando una unión a la izquierda con un conjunto de datos pequeño, puede que la primera sintaxis sea más eficiente.
Cuando combinas o concat
Marcos de Datos por filas (de forma similar a una UNIÓN SQL), el rendimiento depende de si las divisiones de los Marcos de Datos que se combinan están bien ordenadas. Llamamos bien ordenadas a las divisiones de una serie de DataFrames si todas las divisiones son conocidas y la división más alta del DataFrame anterior está por debajo de la de la división más baja del siguiente. Si alguna entrada tiene una división desconocida, Dask producirá una salida sin partición conocida. Con todas las particiones conocidas, Dask trata las concatenaciones basadas en filas como un cambio sólo de metadatos y no realizará ningún barajado. Esto requiere que no haya solapamiento entre las divisiones. También hay un parámetro extra interleave_partitions
, que cambiará el tipo de unión para las combinaciones basadas en filas a uno sin la restricción de partición de entrada y dará como resultado un particionador conocido. Los DataFrames Dask con particionadores conocidos pueden soportar búsquedas y operaciones por clave más rápidas.
La concat
basada en columnas de Dask (similar a una JOIN de SQL) también tiene restricciones en cuanto a las divisiones/particiones de los DataFrames que está combinando. La versión de Dask de concat
sólo admite la unión interna o externa completa, no la izquierda ni la derecha. Las uniones basadas en columnas requieren que todas las entradas tengan particionadores conocidos y también dan como resultado un Marco de datos con partición conocida. Tener un particionador conocido puede ser útil para las uniones posteriores.
Advertencia
No utilices el concat
de Dask cuando operes por filas en un DataFrame con divisiones desconocidas, ya que es probable que devuelva resultados incorrectos.8
Funcionamiento interno de Multi-DataFrame
Dask utiliza cuatro técnicas -hash, broadcast, partitioned y stack_partitions
- para combinar DataFrames, y cada una de ellas tiene un rendimiento muy diferente. Estas cuatro funciones no se corresponden 1:1 con las funciones de unión que elijas. Más bien, Dask elige la técnica en función de los índices, las divisiones y el tipo de unión solicitada (por ejemplo, externa/izquierda/interna). Las tres técnicas de unión basadas en columnas son las uniones hash, las uniones de difusión y las uniones particionadas. Al hacer combinaciones basadas en filas (por ejemplo, append
), Dask tiene una técnica especial llamada stack_partitions
que es extra rápida. Es importante que entiendas el rendimiento de cada una de estas técnicas y las condiciones que harán que Dask elija cada enfoque:
- Uniones hash
-
Es la que Dask utiliza por defecto cuando no hay otra técnica de unión adecuada. Las uniones hash barajan los datos de todos los DataFrame de entrada para dividirlos en la clave objetivo. Utilizan los valores hash de las claves, lo que da como resultado un Marco de datos que no está en ningún orden concreto. Como tal, el resultado de una unión hash no tiene divisiones conocidas.
- La radiodifusión se une
-
Ideal para unir grandes DataFrames con DataFrames pequeños. En una unión de difusión, Dask toma el DataFrame más pequeño y lo distribuye a todos los trabajadores. Esto significa que el DataFrame más pequeño debe caber en la memoria. Para indicar a Dask que un DataFrame es un buen candidato para la difusión, asegúrate de que está todo almacenado en una partición, por ejemplo, llamando a
repartition(npartitions=1)
. - Uniones particionadas
-
Ocurre al combinar DataFrames a lo largo de un índice en el que las particiones/divisiones son conocidas para todos los DataFrames. Como se conocen las particiones de entrada, Dask puede alinear las particiones entre los DataFrames, lo que implica menos transferencia de datos, ya que cada partición de salida tiene menos que un conjunto completo de entradas.
Dado que las uniones particionadas y de difusión son más rápidas, hacer algo de trabajo para ayudar a Dask puede merecer la pena. Por ejemplo, concatenar varios DataFrames con particiones/divisiones conocidas y alineadas y un DataFrame sin alinear dará lugar a una costosa unión hash. En su lugar, intenta establecer el índice y la partición en el DataFrame restante o unir primero los DataFrames menos caros y realizar después la unión cara.
La cuarta técnica, stack_partitions
, es diferente de las otras opciones, ya que no implica ningún movimiento de datos. En su lugar, la lista de particiones del DataFrame resultante es una unión de las particiones ascendentes de los DataFrame de entrada. Dask utiliza stack_partitions
para la mayoría de las combinaciones basadas en filas, excepto cuando todas las divisiones del DataFrame de entrada son conocidas, no están bien ordenadas, y le pides a Dask que interleave_partitions
. La técnica stack_partitions
es capaz de proporcionar particiones conocidas en su salida sólo cuando las divisiones de entrada son conocidas y están bien ordenadas. Si todas las divisiones son conocidas pero no están bien ordenadas y le pides a interleave_partitions
, Dask utilizará en su lugar una unión particionada. Aunque este enfoque es comparativamente barato, no es gratuito, y puede dar lugar a un número excesivamente grande de particiones, lo que te obligaría a reparticionar de todos modos.
Lo que no funciona
El DataFrame de Dask implementa la mayor parte, pero no toda, la API de DataFrame de pandas. Parte de la API de pandas no se implementa en Dask por el tiempo de desarrollo que implica. Otras partes no se utilizan para evitar exponer una API que sería inesperadamente lenta.
A veces a la API sólo le faltan pequeñas partes, ya que tanto pandas como Dask están en desarrollo activo. Un ejemplo es la función split
del Ejemplo 2-10. En pandas local, en lugar de hacer split().explode()
, podrías haber llamado a split(expand=true)
. Algunas de estas partes que faltan pueden ser lugares excelentes para que te involucres y contribuyas al proyecto Dask, si te interesa.
Algunas bibliotecas no paralelizan tan bien como otras. En estos casos, un enfoque habitual es intentar filtrar o agregar los datos lo suficiente como para que puedan representarse localmente y, a continuación, aplicar las bibliotecas locales a los datos. Por ejemplo, en el caso de los gráficos, es habitual agregar previamente los recuentos o tomar una muestra aleatoria y representar gráficamente el resultado.
Aunque gran parte de la API de pandas DataFrame funcionará, antes de cambiarla por Dask DataFrame, es importante asegurarse de que tienes una buena cobertura de pruebas para detectar las situaciones en las que no funciona.
Qué es más lento
Normalmente, utilizar Dask DataFrames mejorará el rendimiento, pero no siempre. En general, los conjuntos de datos más pequeños funcionarán mejor en pandas locales. Como ya se ha dicho, todo lo que implique barajar suele ser más lento en un sistema distribuido que en uno local. Los algoritmos iterativos también pueden producir grandes gráficos de operaciones, que son lentos de evaluar en Dask en comparación con la evaluación codiciosa tradicional.
Algunos problemas son generalmente inadecuados para la computación paralela de datos. Por ejemplo, escribir en un almacén de datos con un único bloqueo que tiene más escritores paralelos aumentará la contención de bloqueos y puede hacerlo más lento que si lo hiciera un único hilo. En estas situaciones, a veces puedes reparticionar los datos o escribir particiones individuales para evitar la contención de bloqueos.
Manejo de algoritmos recursivos
La evaluación perezosa de Dask, impulsada por su gráfico de linaje, es normalmente beneficiosa, ya que le permite combinar pasos automáticamente. Sin embargo, cuando el gráfico se hace demasiado grande, Dask puede tener dificultades para gestionarlo, lo que a menudo se manifiesta como un proceso lento del controlador o del bloc de notas, y a veces como una excepción de falta de memoria. Afortunadamente, puedes evitarlo escribiendo tu DataFrame y volviéndolo a leer. Por lo general, Parquet es el mejor formato para hacerlo, ya que ocupa poco espacio y se autodescribe, por lo que no es necesario inferir esquemas.
Datos recalculados
Otro reto de la evaluación perezosa es si quieres reutilizar un elemento varias veces. Por ejemplo, supongamos que quieres cargar unos cuantos DataFrames y luego calcular varias informaciones. Puedes pedir a Dask que mantenga una colección (incluyendo DataFrame, series, etc.) en memoria ejecutando client.persist(collection)
. No es necesario evitar que se vuelvan a calcular todos los datos; por ejemplo, si la carga de los DataFrames es lo suficientemente rápida, puede estar bien no persistir en ellos.
Advertencia
Al igual que otras funciones en Dask, persist()
no modifica el DataFrame, y si llamas a funciones sobre él, seguirás teniendo tus datos recalculados. Esto es notablemente diferente de Apache Spark.
En qué se diferencian otras funciones
Por razones de rendimiento, varias partes de los DataFrames de Dask se comportan de forma algo diferente a los DataFrames locales:
reset_index
-
El índice volverá a empezar en cero en cada partición.
kurtosis
-
Esta función no filtra los NaN y utiliza los valores por defecto de SciPy.
concat
-
En lugar de coaccionar los tipos de categoría, cada tipo de categoría se expande a la unión de todas las categorías con las que se concatena.
sort_values
-
Dask sólo admite ordenaciones de una columna.
- Unir varios DataFrames
-
Cuando se unen más de dos DataFrames al mismo tiempo, el tipo de unión debe ser externo o izquierdo.
Cuando portes tu código para utilizar los DataFrames de Dash, debes tener especial cuidado siempre que utilices estas funciones, ya que puede que no funcionen exactamente en el eje que pretendías. Trabaja primero a pequeña escala y comprueba que los números son correctos, ya que a menudo puede ser difícil localizar los problemas.
Cuando portes código pandas existente a Dask, considera la posibilidad de utilizar la versión local de una sola máquina para producir conjuntos de datos de prueba con los que comparar los resultados, para asegurarte de que todos los cambios son intencionados.
Ciencia de datos con Dask DataFrame: Poniéndolo todo junto
Dask DataFrame ya ha demostrado ser un marco popular para usos de big data, por lo que queríamos destacar un caso de uso común y sus consideraciones. Aquí, utilizamos un conjunto de datos canónico del reto de la ciencia de datos, el taxi amarillo de la ciudad de Nueva York, y recorremos lo que un ingeniero de datos que trabaje con este conjunto de datos podría considerar. En los capítulos siguientes, que cubren las cargas de trabajo de ML, utilizaremos muchas de las herramientas de DataFrame para basarnos en ellas.
Decidir utilizar Dask
Como ya hemos dicho, Dask destaca en tareas paralelas a datos. Un ajuste particularmente bueno es un conjunto de datos que ya puede estar disponible en formato columnar, como Parquet. También evaluamos dónde viven los datos, como en S3 o en otras opciones de almacenamiento remoto. Muchos científicos de datos e ingenieros probablemente tendrían un conjunto de datos que no puede contenerse en una sola máquina o no puede almacenarse localmente debido a restricciones de cumplimiento. El diseño de Dask se presta bien a estos casos de uso.
Nuestros datos de taxis de la ciudad de Nueva York se ajustan a todos estos criterios: los datos están almacenados en S3 por la ciudad de Nueva York en formato Parquet, y son fácilmente escalables hacia arriba y hacia abajo, ya que están particionados por fechas. Además, evaluamos que los datos ya están estructurados, por lo que podemos utilizar Dask DataFrame. Como los DataFrames de Dask y los DataFrames de pandas son similares, también podemos utilizar muchos de los flujos de trabajo existentes para pandas. Podemos probar algunos de ellos, hacer nuestro análisis exploratorio de datos en un entorno de desarrollo más pequeño, y luego ampliarlo al conjunto de datos completo, todo con el mismo código. Observa que en el Ejemplo 4-16, utilizamos grupos de filas para especificar el comportamiento de fragmentación.
Ejemplo 4-16. Dask DataFrame cargando varios archivos Parquet
filename
=
'./nyc_taxi/*.parquet'
df_x
=
dd
.
read_parquet
(
filename
,
split_row_groups
=
2
)
Análisis exploratorio de datos con Dask
El primer paso de la ciencia de datos a menudo consiste en el análisis exploratorio de datos (AED), o en comprender el conjunto de datos y trazar su forma. Aquí, utilizamos los DataFrames de Dask para recorrer el proceso y examinar los problemas comunes de resolución de problemas que surgen de las diferencias matizadas entre los DataFrame de pandas y los DataFrame de Dask.
Carga de datos
La primera vez que cargues los datos en en tu entorno de desarrollo, puede que te encuentres con problemas de tamaño de bloque o de esquema. Aunque Dask intenta deducir ambos, a veces no puede. Los problemas de tamaño de bloque suelen aparecer cuando llamas a .compute()
en código trivial y ves que un trabajador alcanza el techo de memoria. En ese caso, habría que hacer algo de trabajo manual para determinar el tamaño de trozo correcto. Los problemas de esquema aparecerán como un error o una advertencia cuando leas los datos, o de forma sutil más adelante, como la falta de correspondencia entre float32 y float64. Si ya conoces el esquema, es una buena idea reforzarlo especificando dtypes en la lectura.
Cuando explores más a fondo un conjunto de datos, puede que encuentres datos impresos por defecto en un formato que no te guste, por ejemplo, la notación científica. El control para ello es a través de pandas, no del propio Dask. Dask llama implícitamente a pandas, por lo que debes establecer explícitamente tu formato preferido utilizando pandas.
Las estadísticas resumidas de los datos funcionan igual que .describe()
de pandas, junto con los percentiles especificados o .quantile()
. Recuerda encadenar varios cálculos si estás ejecutando varios de estos, lo que ahorrará tiempo de cálculo de ida y vuelta. El uso de Dask DataFrame describe
se muestra en el Ejemplo 4-17.
Ejemplo 4-17. DataFrame Dask que describe percentiles con un bonito formato
import
pandas
as
pd
pd
.
set_option
(
'display.float_format'
,
lambda
x
:
'
%.5f
'
%
x
)
df
.
describe
(
percentiles
=
[
.25
,
.5
,
.75
])
.
compute
()
Trazar datos
Trazar los datos suele ser un paso importante en para conocer tu conjunto de datos. Trazar grandes datos es un tema delicado. Nosotros, como ingenieros de datos, a menudo sorteamos ese problema trabajando primero con un conjunto de datos de muestra más pequeña. Para ello, Dask trabajaría junto a una biblioteca de ploteo de Python como matplotlib o seaborn, al igual que pandas. La ventaja de Dask DataFrame es que ahora podemos trazar todo el conjunto de datos, si lo deseamos. Podemos utilizar marcos de trazado junto con Dask para trazar todo el conjunto de datos. En este caso, Dask realiza el filtrado, la agregación en los trabajadores distribuidos y, a continuación, recopila todo en un trabajador para entregárselo a una biblioteca no distribuida, como matplotlib, para que lo procese. En el Ejemplo 4-18 se muestra el trazado de un DataFrame Dask.
Ejemplo 4-18. Dask DataFrame trazando la distancia del viaje
import
matplotlib.pyplot
as
plt
import
seaborn
as
sns
import
numpy
as
np
get_ipython
()
.
run_line_magic
(
'matplotlib'
,
'inline'
)
sns
.
set
(
style
=
"white"
,
palette
=
"muted"
,
color_codes
=
True
)
f
,
axes
=
plt
.
subplots
(
1
,
1
,
figsize
=
(
11
,
7
),
sharex
=
True
)
sns
.
despine
(
left
=
True
)
sns
.
distplot
(
np
.
log
(
df
[
'trip_distance'
]
.
values
+
1
),
axlabel
=
'Log(trip_distance)'
,
label
=
'log(trip_distance)'
,
bins
=
50
,
color
=
"r"
)
plt
.
setp
(
axes
,
yticks
=
[])
plt
.
tight_layout
()
plt
.
show
()
Consejo
Ten en cuenta que si estás acostumbrado a la lógica de NumPy, tendrás que pensar en la capa Dask DataFrame al trazar. Por ejemplo, los usuarios de NumPy estarían familiarizados con la sintaxis df[col].values
para definir variables de trazado. El .values
significa una acción diferente en Dask; lo que pasamos es df[col]
en su lugar.
Inspección de datos
Los usuarios de Pandas DataFrame estarían familiarizados con .loc()
y .iloc()
para inspeccionar datos en una fila o columna concretas. Esta lógica se traslada a Dask DataFrame, con importantes diferencias en los comportamientos de .iloc()
.
Un DataFrame Dask suficientemente grande contendrá múltiples DataFrames pandas. Esto cambia la forma en que debemos pensar sobre la numeración y el direccionamiento de los índices. Por ejemplo, .iloc()
(una forma de acceder a las posiciones por índice) no funciona exactamente igual para Dask, ya que cada DataFrame más pequeño tendría su propio valor .iloc()
, y Dask no hace un seguimiento del tamaño de cada DataFrame más pequeño. En otras palabras, un valor de índice global es difícil de averiguar para Dask, ya que Dask tendrá que contar iterativamente a través de cada DataFrame para llegar a un índice. Los usuarios deben comprobar .iloc()
en sus DataFrame y asegurarse de que los índices devuelven los valores correctos.
Consejo
Ten en cuenta que llamar a métodos como .reset_index()
puede restablecer los índices en cada uno de los DataFrames más pequeños, devolviendo potencialmente múltiples valores cuando los usuarios llamen a .iloc()
.
Conclusión
En este capítulo, has aprendido a comprender qué tipos de operaciones son más lentas de lo que cabría esperar con Dask. También has adquirido una serie de técnicas para hacer frente a las diferencias de rendimiento entre los DataFrames de pandas y los DataFrames de Dask. Al comprender las situaciones en las que el rendimiento de los DataFrames de Dask puede no satisfacer tus necesidades, también has conseguido entender qué problemas no se adaptan bien a Dask. Para que puedas juntar todo esto, también has aprendido sobre las opciones de IO de los DataFrame de Dask. A partir de aquí, seguirás aprendiendo más sobre las demás colecciones de Dask y luego sobre cómo ir más allá de las colecciones.
En este capítulo, has aprendido qué puede hacer que tus DataFrames Dask se comporten de forma diferente o más lentamente de lo que cabría esperar. Esta misma comprensión de cómo se implementan los DataFrames Dask puede ayudarte a decidir si los DataFrames distribuidos son adecuados para tu problema. También has visto cómo hacer entrar y salir de los DataFrames de Dask conjuntos de datos mayores de los que puede manejar una sola máquina.
1 Consulta "Particionar/reunir colecciones" para una revisión de la partición.
2 La documentación de FSSPEC incluye los detalles para configurar cada uno de los backends.
3 Esto puede dar lugar a excepciones fuera de memoria mientras se ejecuta la agregación. El crecimiento lineal del almacenamiento exige que (dentro de un factor constante) todos los datos puedan caber en un único proceso, lo que limita la eficacia de Dask.
4 Los algoritmos alternativos para cuantiles exactos dependen de más barajados para reducir la sobrecarga de espacio.
5 La desviación de claves puede hacer que esto sea imposible para un particionador conocido.
6 Estrictamente creciente sin valores repetidos (por ejemplo, 1, 4, 7 es monotónicamente creciente, pero 1, 4, 4, 7 no lo es).
7 Los problemas embarazosamente paralelos son aquellos en los que la sobrecarga de la computación distribuida y la comunicación es baja.
8 Dask asume que los índices están alineados cuando no hay índices presentes.
Get Escalando Python con Dask now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.