Capítulo 4. Uniones Uniones (SQL y Core)
Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com
Unir datos es una parte importante de muchos de nuestros pipelines, y tanto Spark Core como SQL admiten los mismos tipos fundamentales de uniones.Aunque las uniones son muy comunes y potentes, justifican una consideración especial sobre el rendimiento, ya que pueden requerir grandes transferencias de red o incluso crear conjuntos de datos más allá de nuestra capacidad de manejo.1 En el núcleo de Spark puede ser más importante pensar en el orden de las operaciones, ya que el optimizador DAG, a diferencia del optimizador SQL, no es capaz de reordenar o empujar hacia abajo los filtros.
Core Spark se une
En esta sección repasaremos las uniones de tipo RDD.Las uniones en general son caras, ya que requieren que las claves correspondientes de cada RDD se encuentren en la misma partición para poder combinarlas localmente. Si los RDD no tienen particionadores conocidos, habrá que barajarlos para que ambos RDD compartan un particionador, y los datos con las mismas claves vivan en las mismas particiones, como se muestra en la Figura 4 -1. Si tienen el mismo particionador, los datos pueden colocarse, como en la Figura 4-3, para evitar la transferencia de red. Independientemente de que los particionadores sean los mismos, si uno (o ambos) de los RDD tiene un particionador conocido, sólo se crea una estrecha dependencia, como en la Figura 4-2. Como en la mayoría de las operaciones clave/valor, el coste de la unión aumenta con el número de claves y la distancia que tienen que recorrer los registros para llegar a su partición correcta.
Consejo
Dos RDD estarán colocados si tienen el mismo particionador y se barajaron como parte de la misma acción.
Consejo
Las uniones del núcleo de Spark se implementan utilizando la función cogroup
. Hablamos de cogroup
en "Coagrupación".
Elegir un tipo de unión
La operación de unión por defecto en Spark sólo incluye los valores de las claves presentes en ambos RDD, y en el caso de múltiples valores por clave, proporciona todas las permutaciones del par clave/valor. El mejor escenario para una unión estándar es cuando ambos RDD contienen el mismo conjunto de claves distintas. Con claves duplicadas, el tamaño de los datos puede aumentar drásticamente causando problemas de rendimiento, y si una clave no está presente en ambos RDD perderás esa fila de datos. He aquí algunas directrices:
-
Cuando ambos RDD tienen claves duplicadas, la unión puede hacer que el tamaño de los datos aumente drásticamente. Puede ser mejor realizar una operación
distinct
ocombineByKey
para reducir el espacio de claves o utilizarcogroup
para manejar las claves duplicadas en lugar de producir el producto cruzado completo. Utilizando una partición inteligente durante el paso combinar, es posible evitar una segunda mezcla en la unión (más adelante hablaremos de ello en detalle). -
Si las claves no están presentes en ambos RDD, corres el riesgo de perder los datos inesperadamente. Puede ser más seguro utilizar una unión externa, de forma que te garantices conservar todos los datos en el RDD izquierdo o en el derecho, y luego filtrar los datos después de la unión.
-
Si un RDD tiene algún subconjunto de claves fácil de definir, en el otro puede ser mejor filtrar o reducir antes de la unión para evitar una gran mezcla de datos, que al final desecharás de todos modos.
Consejo
La unión es una de las operaciones más caras que utilizarás habitualmente en Spark, por lo que merece la pena que hagas lo posible por reducir tus datos antes de realizar una unión.
Por ejemplo, supongamos que tienes un RDD con unos datos de la forma (Panda id, score)
y otro RDD con (Panda id, address)
, y quieres enviar a cada panda un correo con su mejor puntuación. Podrías unir los RDD en id
y luego calcular la mejor puntuación para cada address
, como se muestra en el Ejemplo 4-1.
Ejemplo 4-1. Unión RDD básica
def
joinScoresWithAddress1
(
scoreRDD
:
RDD
[(
Long
,Double
)],
addressRDD
:
RDD
[(
Long
,String
)])
:
RDD
[(
Long
,(
Double
,String
))]
=
{
val
joinedRDD
=
scoreRDD
.
join
(
addressRDD
)
joinedRDD
.
reduceByKey
(
(
x
,
y
)
=>
if
(
x
.
_1
>
y
.
_1
)
x
else
y
)
}
Sin embargo, probablemente esto no sea tan rápido como reducir primero los datos de puntuación, de modo que el primer conjunto de datos contenga sólo una fila para cada panda con su mejor puntuación, y luego unir esos datos con los de dirección (como se muestra en el Ejemplo 4-2).
Ejemplo 4-2. Prefiltro antes de unir
def
joinScoresWithAddress2
(
scoreRDD
:
RDD
[(
Long
,Double
)],
addressRDD
:
RDD
[(
Long
,String
)])
:
RDD
[(
Long
,(
Double
,String
))]
=
{
val
bestScoreData
=
scoreRDD
.
reduceByKey
((
x
,
y
)
=>
if
(
x
>
y
)
x
else
y
)
bestScoreData
.
join
(
addressRDD
)
}
Si cada Panda tuviera 1.000 puntuaciones diferentes, ¡entonces el tamaño de la barajada que hicimos en el primer enfoque sería 1.000 veces el tamaño de la barajada que hicimos con este enfoque!
Si quisiéramos, también podríamos realizar un left outer join para conservar todas las claves para procesar , incluso las que faltan en el RDD derecho, utilizando leftOuterJoin
en lugar de join
, como en el Ejemplo 4-3. Spark también dispone de fullOuterJoin
y rightOuterJoin
en función de los registros que deseemos conservar. Los valores que faltan son None
y los valores presentes son Some('x')
.
Ejemplo 4-3. RDD básico left outer join
def
outerJoinScoresWithAddress
(
scoreRDD
:
RDD
[(
Long
,Double
)],
addressRDD
:
RDD
[(
Long
,String
)])
:
RDD
[(
Long
,(
Double
,Option
[
String
]))]
=
{
val
joinedRDD
=
scoreRDD
.
leftOuterJoin
(
addressRDD
)
joinedRDD
.
reduceByKey
(
(
x
,
y
)
=>
if
(
x
.
_1
>
y
.
_1
)
x
else
y
)
}
Elegir un plan de ejecución
Para unir datos, Spark necesita que los datos que se van a unir (es decir, los datos basados en cada clave) vivan en la misma partición. La implementación por defecto de una unión en Spark es una unión hash aleatoria. La unión hash barajada garantiza que los datos de cada partición contendrán las mismas claves particionando el segundo conjunto de datos con el mismo particionador por defecto que el primero, de modo que las claves con el mismo valor hash de ambos conjuntos de datos estén en la misma partición. Aunque este método siempre funciona, puede ser más caro de lo necesario porque requiere una mezcla. La mezcla puede evitarse si
-
Ambos RDD tienen un particionador conocido.
-
Uno de los conjuntos de datos es lo suficientemente pequeño como para caber en memoria, en cuyo caso podemos hacer una unión hash de difusión (más adelante explicaremos en qué consiste).
Ten en cuenta que si los RDD están colocados, se puede evitar la transferencia de red, junto con la barajada.
Acelerar las uniones asignando un particionador conocido
Si tienes que hacer una operación antes de la unión que requiera un barajado, como aggregateByKey
o reduceByKey
, puedes evitar el barajado añadiendo un particionador hash con el mismo número de particiones como argumento explícito a la primera operación antes de la unión. Podrías hacer el ejemplo del apartado anterior aún más rápido, utilizando el particionador para los datos de dirección como argumento para el paso reduceByKey
, como en el Ejemplo 4-4 y la Figura 4-4.
Ejemplo 4-4. Unión de particionadores conocida
def
joinScoresWithAddress3
(
scoreRDD
:
RDD
[(
Long
,Double
)],
addressRDD
:
RDD
[(
Long
,String
)])
:
RDD
[(
Long
,(
Double
,String
))]
=
{
// If addressRDD has a known partitioner we should use that,
// otherwise it has a default hash parttioner, which we can reconstruct by
// getting the number of partitions.
val
addressDataPartitioner
=
addressRDD
.
partitioner
match
{
case
(
Some
(
p
))
=>
p
case
(
None
)
=>
new
HashPartitioner
(
addressRDD
.
partitions
.
length
)
}
val
bestScoreData
=
scoreRDD
.
reduceByKey
(
addressDataPartitioner
,
(
x
,
y
)
=>
if
(
x
>
y
)
x
else
y
)
bestScoreData
.
join
(
addressRDD
)
}
Consejo
Si los RDD que comparten el mismo particionador son materializados por la misma acción, acabarán siendo co-localizados (lo que puede incluso reducir el tráfico de red).
Consejo
(Casi) siempre persisten tras la repartición.
Acelerar las uniones utilizando una unión hash de difusión
Un hash join de difusión envía uno de los RDD (el más pequeño) a cada uno de los nodos trabajadores. A continuación, realiza una combinación del lado del mapa con cada partición del RDD mayor. Si uno de tus RDDs cabe en la memoria o se puede hacer que quepa en la memoria, siempre es beneficioso hacer una unión hash de difusión, ya que no requiere una mezcla. A veces (pero no siempre) Spark SQL será lo suficientemente inteligente como para configurar la unión de difusión por sí mismo; en Spark SQL esto se controla con spark.sql.autoBroadcastJoinThreshold
y spark.sql.broadcastTimeout
. Esto se ilustra en la Figura 4-5.
Spark Core no tiene una implementación de la unión hash de difusión. En su lugar, podemos implementar manualmente una versión de la unión hash de difusión recogiendo el RDD más pequeño en el controlador como un mapa, luego difundiendo el resultado y utilizando mapPartitions
para combinar los elementos.
El ejemplo 4-5 es una función general que podría utilizarse para unir un RDD más grande y otro más pequeño. Su comportamiento refleja la operación "unir" por defecto en Spark. Excluimos los elementos cuyas claves no aparecen en ambos RDD.
Ejemplo 4-5. Unión hash de difusión manual
def
manualBroadCastHashJoin
[
K
:
Ordering
:
ClassTag
,V1
:
ClassTag
,V2
:
ClassTag
](
bigRDD
:
RDD
[(
K
,V1
)],
smallRDD
:
RDD
[(
K
,V2
)])
=
{
val
smallRDDLocal
:
Map
[
K
,V2
]
=
smallRDD
.
collectAsMap
()
val
smallRDDLocalBcast
=
bigRDD
.
sparkContext
.
broadcast
(
smallRDDLocal
)
bigRDD
.
mapPartitions
(
iter
=>
{
iter
.
flatMap
{
case
(
k
,
v1
)
=>
smallRDDLocalBcast
.
value
.
get
(
k
)
match
{
case
None
=>
Seq
.
empty
[(
K
,(
V1
,V2
))]
case
Some
(
v2
)
=>
Seq
((
k
,
(
v1
,
v2
)))
}
}
},
preservesPartitioning
=
true
)
}
//end:coreBroadCast[]
}
Unión hash de difusión manual parcial
A veces, no todo nuestro RDD más pequeño cabe en la memoria, pero algunas claves están tan sobrerrepresentadas en el conjunto de datos grande que quieres difundir sólo las claves más comunes.Esto es especialmente útil si una clave es tan grande que no cabe en una sola partición.En este caso, puedes utilizar countByKeyApprox
2 en el RDD grande para tener una idea aproximada de qué claves se beneficiarían más de una difusión.A continuación, filtra el RDD más pequeño sólo para estas claves, recogiendo el resultado localmente en un HashMap. Utilizando sc.broadcast
puedes difundir el HashMap para que cada trabajador sólo tenga una copia y realizar manualmente la unión contra el HashMap. Utilizando el mismo HashMap, puedes filtrar el RDD grande para no incluir el gran número de claves duplicadas y realizar la unión estándar, uniéndola con el resultado de la unión manual. Este método es bastante complicado, pero puede permitirte manejar datos muy sesgados que no podrías procesar de otro modo.
Uniones SQL Spark
Spark SQL admite los mismos tipos básicos de uniones que el núcleo de Spark, pero el optimizador es capaz de hacer más trabajo pesado por ti, aunque también renuncias a parte de tu control. Por ejemplo, Spark SQL a veces puede reducir o reordenar las operaciones para que tus uniones sean más eficientes. Por otra parte, no controlas el particionador de DataFrames
o Datasets
, por lo que no puedes evitar manualmente las mezclas como hacías con las uniones de Spark.
Uniones DataFrame
La unión de datos entre DataFrames
es una de las transformaciones multiDataFrame
más comunes. Todos los tipos de unión SQL estándar son compatibles y se pueden especificar como joinType
en df.join(otherDf, sqlCondition, joinType)
al realizar una unión.
Al igual que con las uniones entre RDDs, la unión con claves no únicas dará como resultado el producto cruzado (por lo que si la tabla izquierda tiene R1 y R2 con clave1 y la tabla derecha tiene R3 y R5 con clave1 obtendrás (R1, R3), (R1, R5), (R2, R3), (R2, R5)) en la salida. Mientras exploramos las uniones SQL de Spark utilizaremos dos tablas de ejemplo de pandas, las Tablas 4-1 y 4-2.
Advertencia
Aunque se admiten las autouniones, debes asignar previamente alias a los campos que te interesen con nombres diferentes, para que se pueda acceder a ellos.
Nombre | Talla |
---|---|
Feliz |
1.0 |
Triste |
0.9 |
Feliz |
1.5 |
Café |
3.0 |
Nombre | Código postal |
---|---|
Feliz |
94110 |
Feliz |
94103 |
Café |
10504 |
Té |
07012 |
Los tipos de unión admitidos por Spark son "inner", "left_outer" (alias "outer"), "left_anti", "right_outer", "full_outer" y "left_semi".3 A excepción de "left_semi", todos estos tipos de unión unen las dos tablas, pero se comportan de forma diferente cuando manejan filas que no tienen claves en ambas tablas.
La unión "interna" es la predeterminada y probablemente en la que piensas cuando piensas en unir tablas. Requiere que la clave esté presente en ambas tablas, o el resultado se descartará, como se muestra en el Ejemplo 4-6 y en la Tabla 4-3.
Ejemplo 4-6. Unión interna simple
// Inner join implicit
df1
.
join
(
df2
,
df1
(
"name"
)
===
df2
(
"name"
))
// Inner join explicit
df1
.
join
(
df2
,
df1
(
"name"
)
===
df2
(
"name"
),
"inner"
)
Nombre | Talla | Nombre | Código postal |
---|---|---|---|
Café |
3.0 |
Café |
10504 |
Feliz |
1.5 |
Feliz |
94110 |
Feliz |
1.5 |
Feliz |
94103 |
Feliz |
1.0 |
Feliz |
94110 |
Feliz |
1.0 |
Feliz |
94103 |
Las uniones externas izquierdas producirán una tabla con todas las claves de la tabla izquierda, y cualquier fila sin claves coincidentes en la tabla derecha tendrá valores nulos en los campos que rellenaría la tabla derecha. Las uniones externas derechas son lo mismo, pero con los requisitos invertidos.En el Ejemplo 4-7 hay un ejemplo de unión externa izquierda, y el resultado se muestra en la Tabla 4-4.
Ejemplo 4-7. Unión externa izquierda
// Left outer join explicit
df1
.
join
(
df2
,
df1
(
"name"
)
===
df2
(
"name"
),
"left_outer"
)
Nombre | Talla | Nombre | Código postal |
---|---|---|---|
Triste |
0.9 |
null |
null |
Café |
3.0 |
Café |
10504 |
Feliz |
1.0 |
Feliz |
94110 |
Feliz |
1.0 |
Feliz |
94103 |
Feliz |
1.5 |
Feliz |
94110 |
Feliz |
1.5 |
Feliz |
94103 |
En el Ejemplo 4-8 encontrarás un ejemplo de unión externa derecha, y en la Tabla 4-5 se muestra el resultado.
Ejemplo 4-8. Unión externa derecha
// Right outer join explicit
df1
.
join
(
df2
,
df1
(
"name"
)
===
df2
(
"name"
),
"right_outer"
)
Nombre | Talla | Nombre | Código postal |
---|---|---|---|
Café |
3.0 |
Café |
10504 |
Feliz |
1.0 |
Feliz |
94110 |
Feliz |
1.0 |
Feliz |
94103 |
Feliz |
1.5 |
Feliz |
94110 |
Feliz |
1.5 |
Feliz |
94103 |
null |
null |
Té |
07012 |
Para conservar todos los registros de ambas tablas puedes utilizar la unión externa completa, que da como resultado la Tabla 4-6.
Nombre | Talla | Nombre | Código postal |
---|---|---|---|
Triste |
0.9 |
null |
null |
Café |
3.0 |
Café |
10504 |
Feliz |
1.0 |
Feliz |
94110 |
Feliz |
1.0 |
Feliz |
94103 |
Feliz |
1.5 |
Feliz |
94110 |
Feliz |
1.5 |
Feliz |
94103 |
null |
null |
Té |
07012 |
Las semiuniones izquierdas (como en el Ejemplo 4-9 y la Tabla 4-7) y las antiuniones izquierdas (como en la Tabla 4-8) son los únicos tipos de uniones que sólo tienen valores de la tabla izquierda.Una semiunión izquierda es lo mismo que filtrar la tabla izquierda para obtener sólo filas con claves presentes en la tabla derecha. La antiunión izquierda también sólo devuelve datos de la tabla izquierda, pero en cambio sólo devuelve registros que no están presentes en la tabla derecha.
Ejemplo 4-9. Semiempalme izquierdo
// Left semi join explicit
df1
.
join
(
df2
,
df1
(
"name"
)
===
df2
(
"name"
),
"left_semi"
)
Nombre | Talla |
---|---|
Café |
3.0 |
Feliz |
1.0 |
Feliz |
1.5 |
Nombre | Talla |
---|---|
Triste |
0.9 |
Uno mismo se une
Las autouniones son compatibles con DataFrames
, pero acabamos con los nombres de las columnas duplicados. Para que puedas acceder a los resultados, tienes que poner alias a DataFrames
con nombres diferentes; de lo contrario, no podrás seleccionar las columnas debido a la colisión de nombres (véase el Ejemplo 4-10). Una vez que hayas puesto alias a cada DataFrame
, en el resultado podrás acceder a las columnas individuales de cada DataFrame
con dfName.colName
.
Ejemplo 4-10. Autounión
val
joined
=
df
.
as
(
"a"
).
join
(
df
.
as
(
"b"
)).
where
(
$
"a.name"
===
$
"b.name"
)
Uniones hash de difusión
En Spark SQL puedes ver el tipo de unión que se está realizando llamando a queryExecution.executedPlan
. Al igual que en el núcleo de Spark, si una de las tablas es mucho más pequeña que la otra, es posible que desees una unión hash de difusión. Puedes indicar a Spark SQL que una DF determinada debe difundirse para la unión llamando a broadcast
en la DataFrame
antes de unirla (por ejemplo, df1.join(broadcast(df2), "key")
). Spark también utiliza automáticamente spark.sql.conf.autoBroadcastJoinThreshold
para determinar si una tabla debe ser difundida.
Uniones de conjuntos de datos
La unión de Datasets
se realiza con joinWith
, y se comporta de forma similar a una unión relacional normal, salvo que el resultado es una tupla de los distintos tipos de registro, como se muestra en el Ejemplo 4-11. Esto es algo más incómodo para trabajar después de la unión, pero también hace que las autouniones, como se muestra en el Ejemplo 4-12, sean mucho más fáciles, ya que no necesitas poner primero un alias a las columnas.
Ejemplo 4-11. Unir dos Conjuntos de Datos
val
result
:
Dataset
[(
RawPanda
,CoffeeShop
)]
=
pandas
.
joinWith
(
coffeeShops
,
$
"zip"
===
$
"zip"
)
Ejemplo 4-12. Autounión de un Conjunto de Datos
val
result
:
Dataset
[(
RawPanda
,RawPanda
)]
=
pandas
.
joinWith
(
pandas
,
$
"zip"
===
$
"zip"
)
Nota
Utilizando una autounión y un lit(true)
, puedes obtener el producto cartesiano de tu Dataset
, lo que puede ser útil, pero también ilustra cómo las uniones (especialmente las autouniones) pueden dar lugar fácilmente a tamaños de datos inviables.
Al igual que en DataFrames
, puedes especificar el tipo de unión deseado (por ejemplo, interna, izquierda_exterior, derecha_exterior, izquierda_semi), lo que cambia la forma en que se gestionan los registros presentes sólo en una Dataset
. Los registros que faltan se representan mediante valores nulos, así que ten cuidado.
1 Como dice el refrán, el producto cruzado de big data y grandes datos es una excepción fuera de la memoria.
2 Si el número de claves distintas es demasiado alto, también puedes utilizar reduceByKey
, ordenar por el valor y tomar el k superior.
3 Las comillas son opcionales y pueden omitirse. Las utilizamos en nuestros ejemplos porque creemos que es más fácil de leer con las comillas presentes.
Get Chispa de alto rendimiento now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.