Kapitel 4. Joins (SQL und Core)
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
Das Verknüpfen von Daten ist ein wichtiger Bestandteil vieler unserer Pipelines, und sowohl Spark Core als auch SQL unterstützen dieselben grundlegenden Arten von Joins.Obwohl Joins sehr verbreitet und leistungsfähig sind, müssen sie in Bezug auf die Leistung besonders berücksichtigt werden, da sie große Netzwerkübertragungen erfordern oder sogar Datensätze erzeugen können, die wir nicht verarbeiten können.1 In Core Spark kann es wichtiger sein, über die Reihenfolge der Operationen nachzudenken, da der DAG-Optimierer im Gegensatz zum SQL-Optimierer nicht in der Lage ist, Filter umzuordnen oder nach unten zu verschieben.
Core Spark tritt bei
In diesem Abschnitt gehen wir auf die RDD-Joins ein.Joins sind im Allgemeinen teuer, da sie erfordern, dass die entsprechenden Schlüssel aus jedem RDD in derselben Partition liegen, damit sie lokal kombiniert werden können. Wenn die RDDs keine bekannten Partitionierer haben, müssen sie so gemischt werden, dass sich beide RDDs einen Partitionierer teilen und die Daten mit denselben Schlüsseln in denselben Partitionen liegen, wie in Abbildung 4-1 gezeigt. Wenn sie denselben Partitionierer haben, können die Daten zusammengelegt werden, wie in Abbildung 4-3, um eine Netzwerkübertragung zu vermeiden. Unabhängig davon, ob die Partitionierer identisch sind, entsteht nur eine enge Abhängigkeit, wenn eine (oder beide) der RDDs einen bekannten Partitionierer haben, wie in Abbildung 4-2. Wie bei den meisten Schlüssel/Wert-Operationen steigen die Kosten der Verknüpfung mit der Anzahl der Schlüssel und der Entfernung, die die Datensätze zurücklegen müssen, um zu ihrer richtigen Partition zu gelangen.
Tipp
Zwei RDDs werden zusammengelegt, wenn sie denselben Partitionierer haben und im Rahmen derselben Aktion gemischt wurden.
Tipp
Core Spark-Joins werden mit der Funktion cogroup
implementiert. Wir besprechen cogroup
in "Co-Grouping".
Auswahl einer Verbindungsart
Die Standard-Join-Operation in Spark enthält nur Werte für Schlüssel, die in beiden RDDs vorhanden sind, und liefert bei mehreren Werten pro Schlüssel alle Permutationen des Schlüssel/Wert-Paares. Das beste Szenario für einen Standard-Join ist, wenn beide RDDs denselben Satz eindeutiger Schlüssel enthalten. Bei doppelten Schlüsseln kann sich die Datenmenge drastisch erhöhen, was zu Leistungsproblemen führen kann, und wenn ein Schlüssel nicht in beiden RDDs vorhanden ist, geht diese Datenzeile verloren. Hier sind ein paar Richtlinien:
-
Wenn beide RDDs doppelte Schlüssel haben, kann die Verknüpfung dazu führen, dass die Größe der Daten dramatisch ansteigt. Es kann besser sein, eine
distinct
odercombineByKey
Operation durchzuführen, um den Schlüsselraum zu verkleinern, odercogroup
zu verwenden, um mit doppelten Schlüsseln umzugehen, anstatt das vollständige Kreuzprodukt zu erzeugen. Durch eine intelligente Partitionierung während des Kombinationsschritts kann ein zweites Mischen in der Verknüpfung verhindert werden (wir werden dies später im Detail besprechen). -
Wenn die Schlüssel nicht in beiden RDDs vorhanden sind, riskierst du einen unerwarteten Datenverlust. Es kann sicherer sein, eine äußere Verknüpfung zu verwenden, so dass du garantiert alle Daten entweder im linken oder im rechten RDD behältst und die Daten nach der Verknüpfung filterst.
-
Wenn ein RDD eine einfach zu definierende Teilmenge der Schlüssel enthält, ist es vielleicht besser, wenn du vor dem Join filterst oder reduzierst, um eine große Datenmenge zu vermeiden, die du am Ende sowieso wegwerfen wirst.
Tipp
Die Verknüpfung ist eine der teuersten Operationen, die du in Spark häufig verwenden wirst. Es lohnt sich also, alles zu tun, um deine Daten zu schrumpfen, bevor du eine Verknüpfung durchführst.
Angenommen, du hast ein RDD mit Daten in der Form (Panda id, score)
und ein anderes RDD mit (Panda id, address)
und du möchtest jedem Panda eine Mail mit seiner besten Punktzahl schicken. Du könntest die RDDs auf id
verbinden und dann die beste Punktzahl für jedes address
berechnen, wie in Beispiel 4-1 gezeigt.
Beispiel 4-1. Grundlegende RDD-Verknüpfung
def
joinScoresWithAddress1
(
scoreRDD
:
RDD
[(
Long
,Double
)],
addressRDD
:
RDD
[(
Long
,String
)])
:
RDD
[(
Long
,(
Double
,String
))]
=
{
val
joinedRDD
=
scoreRDD
.
join
(
addressRDD
)
joinedRDD
.
reduceByKey
(
(
x
,
y
)
=>
if
(
x
.
_1
>
y
.
_1
)
x
else
y
)
}
Dies ist jedoch wahrscheinlich nicht so schnell, wie wenn du zuerst die Punktedaten reduzierst, so dass der erste Datensatz nur eine Zeile für jeden Panda mit seiner besten Punktzahl enthält, und dann diese Daten mit den Adressdaten verbindest (wie in Beispiel 4-2 gezeigt).
Beispiel 4-2. Vorfilter vor der Verknüpfung
def
joinScoresWithAddress2
(
scoreRDD
:
RDD
[(
Long
,Double
)],
addressRDD
:
RDD
[(
Long
,String
)])
:
RDD
[(
Long
,(
Double
,String
))]
=
{
val
bestScoreData
=
scoreRDD
.
reduceByKey
((
x
,
y
)
=>
if
(
x
>
y
)
x
else
y
)
bestScoreData
.
join
(
addressRDD
)
}
Wenn jeder Panda 1.000 verschiedene Punkte hatte, dann war die Größe der Mischung, die wir mit dem ersten Ansatz gemacht haben, 1.000 Mal so groß wie die Mischung, die wir mit diesem Ansatz gemacht haben!
Wenn wir wollten, könnten wir auch einen Left Outer Join durchführen, um alle Schlüssel für die Verarbeitung von zu behalten, auch die, die im rechten RDD fehlen, indem wir leftOuterJoin
anstelle von join
verwenden, wie in Beispiel 4-3. Spark hat auch fullOuterJoin
und rightOuterJoin
, je nachdem, welche Datensätze wir behalten wollen. Alle fehlenden Werte sind None
und vorhandene Werte sind Some('x')
.
Beispiel 4-3. Einfacher RDD Left Outer Join
def
outerJoinScoresWithAddress
(
scoreRDD
:
RDD
[(
Long
,Double
)],
addressRDD
:
RDD
[(
Long
,String
)])
:
RDD
[(
Long
,(
Double
,Option
[
String
]))]
=
{
val
joinedRDD
=
scoreRDD
.
leftOuterJoin
(
addressRDD
)
joinedRDD
.
reduceByKey
(
(
x
,
y
)
=>
if
(
x
.
_1
>
y
.
_1
)
x
else
y
)
}
Auswahl eines Ausführungsplans
Um Daten zu verknüpfen, müssen sich die zu verknüpfenden Daten (d. h. die Daten, die auf den einzelnen Schlüsseln basieren) in derselben Partition befinden. Die Standardimplementierung eines Join in Spark ist ein Shuffled Hash Join. Die gemischte Hash-Verknüpfung stellt sicher, dass die Daten in jeder Partition die gleichen Schlüssel enthalten, indem der zweite Datensatz mit dem gleichen Standardpartitionierer wie der erste partitioniert wird, sodass die Schlüssel mit dem gleichen Hash-Wert aus beiden Datensätzen in der gleichen Partition liegen. Dieser Ansatz funktioniert zwar immer, kann aber teurer sein als nötig, weil er einen Shuffle erfordert. Der Shuffle kann vermieden werden, wenn:
-
Beide RDDs haben einen bekannten Partitionierer.
-
Einer der Datensätze ist so klein, dass er in den Speicher passt. In diesem Fall können wir einen Broadcast Hash Join durchführen (was das ist, erklären wir später).
Wenn die RDDs an einem Ort liegen, kann der Netzwerktransfer und das Mischen vermieden werden.
Beschleunigung von Joins durch Zuweisung eines bekannten Partitionierers
Wenn du vor der Verknüpfung eine Operation durchführen musst, die ein Shuffle erfordert, wie aggregateByKey
oder reduceByKey
, kannst du das Shuffle verhindern, indem du einen Hash-Partitionierer mit der gleichen Anzahl von Partitionen als explizites Argument für die erste Operation vor der Verknüpfung hinzufügst. Du könntest das Beispiel im vorherigen Abschnitt noch schneller machen, indem du den Partitionierer für die Adressdaten als Argument für den Schritt reduceByKey
verwendest, wie in Beispiel 4-4 und Abbildung 4-4.
Beispiel 4-4. Bekannter Partitionierer Join
def
joinScoresWithAddress3
(
scoreRDD
:
RDD
[(
Long
,Double
)],
addressRDD
:
RDD
[(
Long
,String
)])
:
RDD
[(
Long
,(
Double
,String
))]
=
{
// If addressRDD has a known partitioner we should use that,
// otherwise it has a default hash parttioner, which we can reconstruct by
// getting the number of partitions.
val
addressDataPartitioner
=
addressRDD
.
partitioner
match
{
case
(
Some
(
p
))
=>
p
case
(
None
)
=>
new
HashPartitioner
(
addressRDD
.
partitions
.
length
)
}
val
bestScoreData
=
scoreRDD
.
reduceByKey
(
addressDataPartitioner
,
(
x
,
y
)
=>
if
(
x
>
y
)
x
else
y
)
bestScoreData
.
join
(
addressRDD
)
}
Tipp
Wenn die RDDs, die sich denselben Partitionierer teilen, durch dieselbe Aktion materialisiert werden, werden sie schließlich zusammengelegt (was sogar den Netzwerkverkehr reduzieren kann).
Tipp
bleiben nach einer Neuaufteilung (fast) immer bestehen.
Beschleunigung von Joins mit einem Broadcast-Hash-Join
Ein Broadcast Hash Join schiebt eines der RDDs (das kleinere) zu jedem der Worker Nodes. Dann wird eine map-seitige Kombination mit jeder Partition des größeren RDDs durchgeführt. Wenn eines deiner RDDs in den Speicher passt oder dafür gesorgt werden kann, dass es in den Speicher passt, ist es immer vorteilhaft, einen Broadcast-Hash-Join durchzuführen, da kein Shuffle erforderlich ist. Manchmal (aber nicht immer) ist Spark SQL schlau genug, den Broadcast-Join selbst zu konfigurieren; in Spark SQL wird dies mit spark.sql.autoBroadcastJoinThreshold
und spark.sql.broadcastTimeout
gesteuert. Dies ist in Abbildung 4-5 dargestellt.
Spark Core verfügt nicht über eine Implementierung des Broadcast Hash Join. Stattdessen können wir eine Version des Broadcast Hash Join manuell implementieren, indem wir das kleinere RDD als Map an den Treiber übergeben, dann das Ergebnis übertragen und mapPartitions
verwenden, um die Elemente zu kombinieren.
Beispiel 4-5 ist eine allgemeine Funktion, die verwendet werden kann, um ein größeres und ein kleineres RDD zu verbinden. Sie verhält sich genauso wie die Standard-"Join"-Operation in Spark. Wir schließen Elemente aus, deren Schlüssel nicht in beiden RDDs vorkommen.
Beispiel 4-5. Manueller Broadcast Hash Join
def
manualBroadCastHashJoin
[
K
:
Ordering
:
ClassTag
,V1
:
ClassTag
,V2
:
ClassTag
](
bigRDD
:
RDD
[(
K
,V1
)],
smallRDD
:
RDD
[(
K
,V2
)])
=
{
val
smallRDDLocal
:
Map
[
K
,V2
]
=
smallRDD
.
collectAsMap
()
val
smallRDDLocalBcast
=
bigRDD
.
sparkContext
.
broadcast
(
smallRDDLocal
)
bigRDD
.
mapPartitions
(
iter
=>
{
iter
.
flatMap
{
case
(
k
,
v1
)
=>
smallRDDLocalBcast
.
value
.
get
(
k
)
match
{
case
None
=>
Seq
.
empty
[(
K
,(
V1
,V2
))]
case
Some
(
v2
)
=>
Seq
((
k
,
(
v1
,
v2
)))
}
}
},
preservesPartitioning
=
true
)
}
//end:coreBroadCast[]
}
Teilweise manueller Broadcast Hash Join
Manchmal passt nicht das gesamte kleinere RDD in den Speicher, aber einige Schlüssel sind in dem großen Datensatz so überrepräsentiert, dass du nur die häufigsten Schlüssel übertragen willst.Das ist besonders nützlich, wenn ein Schlüssel so groß ist, dass er nicht in eine einzelne Partition passt.In diesem Fall kannst du mit countByKeyApprox
2 auf das große RDD anwenden, um eine ungefähre Vorstellung davon zu bekommen, welche Schlüssel am meisten von einem Broadcast profitieren würden.Dann filterst du das kleinere RDD nur nach diesen Schlüsseln und sammelst das Ergebnis lokal in einer HashMap. Mit sc.broadcast
kannst du die HashMap verteilen, so dass jeder Arbeiter nur eine Kopie hat, und den Join gegen die HashMap manuell durchführen. Mit der gleichen HashMap kannst du dann dein großes RDD so filtern, dass es die vielen doppelten Schlüssel nicht enthält, und deinen Standard-Join durchführen, indem du ihn mit dem Ergebnis deines manuellen Joins zusammenführst. Dieser Ansatz ist ziemlich kompliziert, aber er ermöglicht es dir, mit stark verzerrten Daten umzugehen, die du sonst nicht verarbeiten könntest.
Spark SQL Joins
Spark SQL unterstützt dieselben grundlegenden Join-Typen wie Core Spark, aber der Optimierer ist in der Lage, mehr von der schweren Arbeit für dich zu übernehmen - obwohl du auch einen Teil deiner Kontrolle abgibst. Spark SQL kann zum Beispiel manchmal Operationen nach unten verschieben oder neu anordnen, um deine Joins effizienter zu machen. Andererseits hast du keine Kontrolle über den Partitionierer für DataFrames
oder Datasets
, sodass du Shuffles nicht manuell vermeiden kannst, wie du es bei Core Spark-Joins getan hast.
Datenrahmen Joins
Das Joinen von Daten zwischen DataFrames
ist eine der häufigsten MultiDataFrame
Transformationen. Die Standard-SQL-Join-Typen werden alle unterstützt und können als joinType
in df.join(otherDf, sqlCondition, joinType)
angegeben werden, wenn ein Join durchgeführt wird.
Wie bei Joins zwischen RDDs führt das Joinen mit nicht eindeutigen Schlüsseln zu einem Kreuzprodukt (wenn also die linke Tabelle R1 und R2 mit Schlüssel 1 und die rechte Tabelle R3 und R5 mit Schlüssel 1 hat, erhältst du (R1, R3), (R1, R5), (R2, R3), (R2, R5)) in der Ausgabe. Während wir Spark SQL-Joins untersuchen, verwenden wir zwei Beispieltabellen von Pandas, Tabelle 4-1 und 4-2.
Warnung
Self-Joins werden zwar unterstützt, aber du musst den Feldern, an denen du interessiert bist, vorher andere Namen geben, damit du auf sie zugreifen kannst.
Name | Größe |
---|---|
Glücklich |
1.0 |
Traurig |
0.9 |
Glücklich |
1.5 |
Kaffee |
3.0 |
Name | Zip |
---|---|
Glücklich |
94110 |
Glücklich |
94103 |
Kaffee |
10504 |
Tee |
07012 |
Die von Spark unterstützten join Typen sind "inner", "left_outer" (alias "outer"), "left_anti", "right_outer", "full_outer" und "left_semi".3 Mit Ausnahme von "left_semi" verbinden alle diese Join-Typen die beiden Tabellen, aber sie verhalten sich unterschiedlich, wenn es um Zeilen geht, die nicht in beiden Tabellen Schlüssel haben.
Die "innere" Verknüpfung ist der Standard und wahrscheinlich das, woran du denkst, wenn du an die Verknüpfung von Tabellen denkst. Sie erfordert, dass der Schlüssel in beiden Tabellen vorhanden ist, sonst wird das Ergebnis verworfen, wie in Beispiel 4-6 und Tabelle 4-3 gezeigt.
Beispiel 4-6. Einfache innere Verknüpfung
// Inner join implicit
df1
.
join
(
df2
,
df1
(
"name"
)
===
df2
(
"name"
))
// Inner join explicit
df1
.
join
(
df2
,
df1
(
"name"
)
===
df2
(
"name"
),
"inner"
)
Name | Größe | Name | Zip |
---|---|---|---|
Kaffee |
3.0 |
Kaffee |
10504 |
Glücklich |
1.5 |
Glücklich |
94110 |
Glücklich |
1.5 |
Glücklich |
94103 |
Glücklich |
1.0 |
Glücklich |
94110 |
Glücklich |
1.0 |
Glücklich |
94103 |
Left Outer Joins erzeugen eine Tabelle mit allen Schlüsseln aus der linken Tabelle, und alle Zeilen ohne passende Schlüssel in der rechten Tabelle haben Nullwerte in den Feldern, die von der rechten Tabelle ausgefüllt werden würden. Right Outer Joins funktionieren genauso, allerdings mit umgekehrten Voraussetzungen.Ein Beispiel für einen Left Outer Join findest du in Beispiel 4-7, das Ergebnis ist in Tabelle 4-4 dargestellt.
Beispiel 4-7. Linke äußere Verknüpfung
// Left outer join explicit
df1
.
join
(
df2
,
df1
(
"name"
)
===
df2
(
"name"
),
"left_outer"
)
Name | Größe | Name | Zip |
---|---|---|---|
Traurig |
0.9 |
null |
null |
Kaffee |
3.0 |
Kaffee |
10504 |
Glücklich |
1.0 |
Glücklich |
94110 |
Glücklich |
1.0 |
Glücklich |
94103 |
Glücklich |
1.5 |
Glücklich |
94110 |
Glücklich |
1.5 |
Glücklich |
94103 |
Ein Beispiel für einen Right Outer Join findest du in Beispiel 4-8, und. Das Ergebnis ist in Tabelle 4-5 dargestellt.
Beispiel 4-8. Rechter äußerer Join
// Right outer join explicit
df1
.
join
(
df2
,
df1
(
"name"
)
===
df2
(
"name"
),
"right_outer"
)
Name | Größe | Name | Zip |
---|---|---|---|
Kaffee |
3.0 |
Kaffee |
10504 |
Glücklich |
1.0 |
Glücklich |
94110 |
Glücklich |
1.0 |
Glücklich |
94103 |
Glücklich |
1.5 |
Glücklich |
94110 |
Glücklich |
1.5 |
Glücklich |
94103 |
null |
null |
Tee |
07012 |
Um alle Datensätze aus beiden Tabellen zu erhalten, kannst du den Full Outer Join verwenden, der zu Tabelle 4-6 führt.
Name | Größe | Name | Zip |
---|---|---|---|
Traurig |
0.9 |
null |
null |
Kaffee |
3.0 |
Kaffee |
10504 |
Glücklich |
1.0 |
Glücklich |
94110 |
Glücklich |
1.0 |
Glücklich |
94103 |
Glücklich |
1.5 |
Glücklich |
94110 |
Glücklich |
1.5 |
Glücklich |
94103 |
null |
null |
Tee |
07012 |
Linke Semi-Joins (wie in Beispiel 4-9 und Tabelle 4-7) und linke Anti-Joins (wie in Tabelle 4-8) sind die einzigen Arten von Joins, die nur Werte aus der linken Tabelle enthalten.Ein linker Semi-Join ist dasselbe wie das Filtern der linken Tabelle nach nur Zeilen mit Schlüsseln, die in der rechten Tabelle vorhanden sind. Der linke Anti-Join liefert ebenfalls nur Daten aus der linken Tabelle, gibt aber stattdessen nur Datensätze zurück, die in der rechten Tabelle nicht vorhanden sind.
Beispiel 4-9. Linke Semi-Join
// Left semi join explicit
df1
.
join
(
df2
,
df1
(
"name"
)
===
df2
(
"name"
),
"left_semi"
)
Name | Größe |
---|---|
Kaffee |
3.0 |
Glücklich |
1.0 |
Glücklich |
1.5 |
Name | Größe |
---|---|
Traurig |
0.9 |
Selbst tritt bei
Self-Joins werden auf DataFrames
unterstützt, aber wir erhalten doppelte Spaltennamen. Damit du auf die Ergebnisse zugreifen kannst, musst du die DataFrames
auf verschiedene Namen aliasieren - sonst kannst du die Spalten aufgrund von Namenskollisionen nicht auswählen (siehe Beispiel 4-10). Sobald du jedes DataFrame
aliasiert hast, kannst du im Ergebnis auf die einzelnen Spalten für jedes DataFrame
mit dfName.colName
zugreifen.
Beispiel 4-10. Selbst join
val
joined
=
df
.
as
(
"a"
).
join
(
df
.
as
(
"b"
)).
where
(
$
"a.name"
===
$
"b.name"
)
Broadcast Hash Joins
In Spark SQL kannst du die Art der Verknüpfung sehen, indem du queryExecution.executedPlan
aufrufst. Wenn eine der Tabellen viel kleiner ist als die andere, möchtest du vielleicht eine Broadcast-Hash-Verknüpfung. Du kannst Spark SQL mitteilen, dass eine bestimmte DF für die Verknüpfung Broadcast sein soll, indem du broadcast
auf DataFrame
aufrufst, bevor du sie verknüpfst (z. B. df1.join(broadcast(df2), "key")
). Spark verwendet auch automatisch die spark.sql.conf.autoBroadcastJoinThreshold
, um festzustellen, ob eine Tabelle broadcasted werden soll.
Dataset Joins
Die Verknüpfung von Datasets
erfolgt mit joinWith
und verhält sich ähnlich wie eine normale relationale Verknüpfung, außer dass das Ergebnis ein Tupel der verschiedenen Datensatztypen ist, wie in Beispiel 4-11 gezeigt. Dies ist etwas umständlicher, wenn man nach der Verknüpfung damit arbeitet, macht aber auch Self-Joins, wie in Beispiel 4-12 gezeigt, viel einfacher, da man die Spalten nicht erst mit Aliasen versehen muss.
Beispiel 4-11. Zwei Datensätze verbinden
val
result
:
Dataset
[(
RawPanda
,CoffeeShop
)]
=
pandas
.
joinWith
(
coffeeShops
,
$
"zip"
===
$
"zip"
)
Beispiel 4-12. Self join a Dataset
val
result
:
Dataset
[(
RawPanda
,RawPanda
)]
=
pandas
.
joinWith
(
pandas
,
$
"zip"
===
$
"zip"
)
Hinweis
Mit einem Self-Join und einem lit(true)
kannst du das kartesische Produkt deiner Dataset
erstellen. Das kann nützlich sein, zeigt aber auch, wie Joins (insbesondere Self-Joins) leicht zu unbrauchbaren Datengrößen führen können.
Wie bei DataFrames
kannst du die Art der Verknüpfung angeben (z. B. inner, left_outer, right_outer, left_semi) und damit festlegen, wie Datensätze behandelt werden, die nur in einer Dataset
vorhanden sind. Fehlende Datensätze werden durch Nullwerte dargestellt, also sei vorsichtig.
1 Wie das Sprichwort sagt, ist das Kreuzungsprodukt von Big Data und Big Data eine Out-of-Memory-Ausnahme.
2 Wenn die Anzahl der eindeutigen Schlüssel zu hoch ist, kannst du auch reduceByKey
verwenden, nach dem Wert sortieren und das oberste k nehmen.
3 Die Anführungszeichen sind optional und können weggelassen werden. Wir verwenden sie in unseren Beispielen, weil wir denken, dass es mit den Anführungszeichen einfacher zu lesen ist.
Get Hochleistungsfunken now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.