Kapitel 4. Joins (SQL und Core)

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

Das Verknüpfen von Daten ist ein wichtiger Bestandteil vieler unserer Pipelines, und sowohl Spark Core als auch SQL unterstützen dieselben grundlegenden Arten von Joins.Obwohl Joins sehr verbreitet und leistungsfähig sind, müssen sie in Bezug auf die Leistung besonders berücksichtigt werden, da sie große Netzwerkübertragungen erfordern oder sogar Datensätze erzeugen können, die wir nicht verarbeiten können.1 In Core Spark kann es wichtiger sein, über die Reihenfolge der Operationen nachzudenken, da der DAG-Optimierer im Gegensatz zum SQL-Optimierer nicht in der Lage ist, Filter umzuordnen oder nach unten zu verschieben.

Core Spark tritt bei

In diesem Abschnitt gehen wir auf die RDD-Joins ein.Joins sind im Allgemeinen teuer, da sie erfordern, dass die entsprechenden Schlüssel aus jedem RDD in derselben Partition liegen, damit sie lokal kombiniert werden können. Wenn die RDDs keine bekannten Partitionierer haben, müssen sie so gemischt werden, dass sich beide RDDs einen Partitionierer teilen und die Daten mit denselben Schlüsseln in denselben Partitionen liegen, wie in Abbildung 4-1 gezeigt. Wenn sie denselben Partitionierer haben, können die Daten zusammengelegt werden, wie in Abbildung 4-3, um eine Netzwerkübertragung zu vermeiden. Unabhängig davon, ob die Partitionierer identisch sind, entsteht nur eine enge Abhängigkeit, wenn eine (oder beide) der RDDs einen bekannten Partitionierer haben, wie in Abbildung 4-2. Wie bei den meisten Schlüssel/Wert-Operationen steigen die Kosten der Verknüpfung mit der Anzahl der Schlüssel und der Entfernung, die die Datensätze zurücklegen müssen, um zu ihrer richtigen Partition zu gelangen.

Join, full shuffle
Abbildung 4-1. Mischen verbinden
Join one partitioner known
Abbildung 4-2. Beide bekannten Partitionierer verbinden
Colocated join
Abbildung 4-3. Kollokierte Verbindung
Tipp

Zwei RDDs werden zusammengelegt, wenn sie denselben Partitionierer haben und im Rahmen derselben Aktion gemischt wurden.

Tipp

Core Spark-Joins werden mit der Funktion cogroup implementiert. Wir besprechen cogroup in "Co-Grouping".

Auswahl einer Verbindungsart

Die Standard-Join-Operation in Spark enthält nur Werte für Schlüssel, die in beiden RDDs vorhanden sind, und liefert bei mehreren Werten pro Schlüssel alle Permutationen des Schlüssel/Wert-Paares. Das beste Szenario für einen Standard-Join ist, wenn beide RDDs denselben Satz eindeutiger Schlüssel enthalten. Bei doppelten Schlüsseln kann sich die Datenmenge drastisch erhöhen, was zu Leistungsproblemen führen kann, und wenn ein Schlüssel nicht in beiden RDDs vorhanden ist, geht diese Datenzeile verloren. Hier sind ein paar Richtlinien:

  • Wenn beide RDDs doppelte Schlüssel haben, kann die Verknüpfung dazu führen, dass die Größe der Daten dramatisch ansteigt. Es kann besser sein, eine distinct oder combineByKey Operation durchzuführen, um den Schlüsselraum zu verkleinern, oder cogroup zu verwenden, um mit doppelten Schlüsseln umzugehen, anstatt das vollständige Kreuzprodukt zu erzeugen. Durch eine intelligente Partitionierung während des Kombinationsschritts kann ein zweites Mischen in der Verknüpfung verhindert werden (wir werden dies später im Detail besprechen).

  • Wenn die Schlüssel nicht in beiden RDDs vorhanden sind, riskierst du einen unerwarteten Datenverlust. Es kann sicherer sein, eine äußere Verknüpfung zu verwenden, so dass du garantiert alle Daten entweder im linken oder im rechten RDD behältst und die Daten nach der Verknüpfung filterst.

  • Wenn ein RDD eine einfach zu definierende Teilmenge der Schlüssel enthält, ist es vielleicht besser, wenn du vor dem Join filterst oder reduzierst, um eine große Datenmenge zu vermeiden, die du am Ende sowieso wegwerfen wirst.

Tipp

Die Verknüpfung ist eine der teuersten Operationen, die du in Spark häufig verwenden wirst. Es lohnt sich also, alles zu tun, um deine Daten zu schrumpfen, bevor du eine Verknüpfung durchführst.

Angenommen, du hast ein RDD mit Daten in der Form (Panda id, score) und ein anderes RDD mit (Panda id, address) und du möchtest jedem Panda eine Mail mit seiner besten Punktzahl schicken. Du könntest die RDDs auf id verbinden und dann die beste Punktzahl für jedes address berechnen, wie in Beispiel 4-1 gezeigt.

Beispiel 4-1. Grundlegende RDD-Verknüpfung
  def joinScoresWithAddress1( scoreRDD : RDD[(Long, Double)],
   addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
    val joinedRDD = scoreRDD.join(addressRDD)
    joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
  }

Dies ist jedoch wahrscheinlich nicht so schnell, wie wenn du zuerst die Punktedaten reduzierst, so dass der erste Datensatz nur eine Zeile für jeden Panda mit seiner besten Punktzahl enthält, und dann diese Daten mit den Adressdaten verbindest (wie in Beispiel 4-2 gezeigt).

Beispiel 4-2. Vorfilter vor der Verknüpfung
  def joinScoresWithAddress2(scoreRDD : RDD[(Long, Double)],
    addressRDD: RDD[(Long, String)]) : RDD[(Long, (Double, String))]= {
   val bestScoreData = scoreRDD.reduceByKey((x, y) => if(x > y) x else y)
   bestScoreData.join(addressRDD)
  }

Wenn jeder Panda 1.000 verschiedene Punkte hatte, dann war die Größe der Mischung, die wir mit dem ersten Ansatz gemacht haben, 1.000 Mal so groß wie die Mischung, die wir mit diesem Ansatz gemacht haben!

Wenn wir wollten, könnten wir auch einen Left Outer Join durchführen, um alle Schlüssel für die Verarbeitung von zu behalten, auch die, die im rechten RDD fehlen, indem wir leftOuterJoin anstelle von join verwenden, wie in Beispiel 4-3. Spark hat auch fullOuterJoin und rightOuterJoin, je nachdem, welche Datensätze wir behalten wollen. Alle fehlenden Werte sind None und vorhandene Werte sind Some('x').

Beispiel 4-3. Einfacher RDD Left Outer Join
  def outerJoinScoresWithAddress(scoreRDD : RDD[(Long, Double)],
   addressRDD: RDD[(Long, String)]) : RDD[(Long, (Double, Option[String]))]= {
    val joinedRDD = scoreRDD.leftOuterJoin(addressRDD)
    joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
  }

Auswahl eines Ausführungsplans

Um Daten zu verknüpfen, müssen sich die zu verknüpfenden Daten (d. h. die Daten, die auf den einzelnen Schlüsseln basieren) in derselben Partition befinden. Die Standardimplementierung eines Join in Spark ist ein Shuffled Hash Join. Die gemischte Hash-Verknüpfung stellt sicher, dass die Daten in jeder Partition die gleichen Schlüssel enthalten, indem der zweite Datensatz mit dem gleichen Standardpartitionierer wie der erste partitioniert wird, sodass die Schlüssel mit dem gleichen Hash-Wert aus beiden Datensätzen in der gleichen Partition liegen. Dieser Ansatz funktioniert zwar immer, kann aber teurer sein als nötig, weil er einen Shuffle erfordert. Der Shuffle kann vermieden werden, wenn:

  1. Beide RDDs haben einen bekannten Partitionierer.

  2. Einer der Datensätze ist so klein, dass er in den Speicher passt. In diesem Fall können wir einen Broadcast Hash Join durchführen (was das ist, erklären wir später).

Wenn die RDDs an einem Ort liegen, kann der Netzwerktransfer und das Mischen vermieden werden.

Beschleunigung von Joins durch Zuweisung eines bekannten Partitionierers

Wenn du vor der Verknüpfung eine Operation durchführen musst, die ein Shuffle erfordert, wie aggregateByKey oder reduceByKey, kannst du das Shuffle verhindern, indem du einen Hash-Partitionierer mit der gleichen Anzahl von Partitionen als explizites Argument für die erste Operation vor der Verknüpfung hinzufügst. Du könntest das Beispiel im vorherigen Abschnitt noch schneller machen, indem du den Partitionierer für die Adressdaten als Argument für den Schritt reduceByKey verwendest, wie in Beispiel 4-4 und Abbildung 4-4.

Beispiel 4-4. Bekannter Partitionierer Join
  def joinScoresWithAddress3(scoreRDD: RDD[(Long, Double)],
   addressRDD: RDD[(Long, String)]) : RDD[(Long, (Double, String))]= {
    // If addressRDD has a known partitioner we should use that,
    // otherwise it has a default hash parttioner, which we can reconstruct by
    // getting the number of partitions.
    val addressDataPartitioner = addressRDD.partitioner match {
      case (Some(p)) => p
      case (None) => new HashPartitioner(addressRDD.partitions.length)
    }
    val bestScoreData = scoreRDD.reduceByKey(addressDataPartitioner,
      (x, y) => if(x > y) x else y)
    bestScoreData.join(addressRDD)
  }
Tipp

Wenn die RDDs, die sich denselben Partitionierer teilen, durch dieselbe Aktion materialisiert werden, werden sie schließlich zusammengelegt (was sogar den Netzwerkverkehr reduzieren kann).

Tipp

bleiben nach einer Neuaufteilung (fast) immer bestehen.

Join both partitioners known
Abbildung 4-4. Beide bekannten Partitionierer verbinden

Beschleunigung von Joins mit einem Broadcast-Hash-Join

Ein Broadcast Hash Join schiebt eines der RDDs (das kleinere) zu jedem der Worker Nodes. Dann wird eine map-seitige Kombination mit jeder Partition des größeren RDDs durchgeführt. Wenn eines deiner RDDs in den Speicher passt oder dafür gesorgt werden kann, dass es in den Speicher passt, ist es immer vorteilhaft, einen Broadcast-Hash-Join durchzuführen, da kein Shuffle erforderlich ist. Manchmal (aber nicht immer) ist Spark SQL schlau genug, den Broadcast-Join selbst zu konfigurieren; in Spark SQL wird dies mit spark.sql.autoBroadcastJoinThreshold und spark.sql.broadcastTimeout gesteuert. Dies ist in Abbildung 4-5 dargestellt.

Broadcast Hash Join
Abbildung 4-5. Broadcast Hash Join

Spark Core verfügt nicht über eine Implementierung des Broadcast Hash Join. Stattdessen können wir eine Version des Broadcast Hash Join manuell implementieren, indem wir das kleinere RDD als Map an den Treiber übergeben, dann das Ergebnis übertragen und mapPartitions verwenden, um die Elemente zu kombinieren.

Beispiel 4-5 ist eine allgemeine Funktion, die verwendet werden kann, um ein größeres und ein kleineres RDD zu verbinden. Sie verhält sich genauso wie die Standard-"Join"-Operation in Spark. Wir schließen Elemente aus, deren Schlüssel nicht in beiden RDDs vorkommen.

Beispiel 4-5. Manueller Broadcast Hash Join
 def manualBroadCastHashJoin[K : Ordering : ClassTag, V1 : ClassTag,
 V2 : ClassTag](bigRDD : RDD[(K, V1)],
  smallRDD : RDD[(K, V2)])= {
  val smallRDDLocal: Map[K, V2] = smallRDD.collectAsMap()
  val smallRDDLocalBcast = bigRDD.sparkContext.broadcast(smallRDDLocal)
  bigRDD.mapPartitions(iter => {
   iter.flatMap{
    case (k,v1 ) =>
     smallRDDLocalBcast.value.get(k) match {
      case None => Seq.empty[(K, (V1, V2))]
      case Some(v2) => Seq((k, (v1, v2)))
     }
   }
  }, preservesPartitioning = true)
 }
 //end:coreBroadCast[]
}

Teilweise manueller Broadcast Hash Join

Manchmal passt nicht das gesamte kleinere RDD in den Speicher, aber einige Schlüssel sind in dem großen Datensatz so überrepräsentiert, dass du nur die häufigsten Schlüssel übertragen willst.Das ist besonders nützlich, wenn ein Schlüssel so groß ist, dass er nicht in eine einzelne Partition passt.In diesem Fall kannst du mit countByKeyApprox2 auf das große RDD anwenden, um eine ungefähre Vorstellung davon zu bekommen, welche Schlüssel am meisten von einem Broadcast profitieren würden.Dann filterst du das kleinere RDD nur nach diesen Schlüsseln und sammelst das Ergebnis lokal in einer HashMap. Mit sc.broadcast kannst du die HashMap verteilen, so dass jeder Arbeiter nur eine Kopie hat, und den Join gegen die HashMap manuell durchführen. Mit der gleichen HashMap kannst du dann dein großes RDD so filtern, dass es die vielen doppelten Schlüssel nicht enthält, und deinen Standard-Join durchführen, indem du ihn mit dem Ergebnis deines manuellen Joins zusammenführst. Dieser Ansatz ist ziemlich kompliziert, aber er ermöglicht es dir, mit stark verzerrten Daten umzugehen, die du sonst nicht verarbeiten könntest.

Spark SQL Joins

Spark SQL unterstützt dieselben grundlegenden Join-Typen wie Core Spark, aber der Optimierer ist in der Lage, mehr von der schweren Arbeit für dich zu übernehmen - obwohl du auch einen Teil deiner Kontrolle abgibst. Spark SQL kann zum Beispiel manchmal Operationen nach unten verschieben oder neu anordnen, um deine Joins effizienter zu machen. Andererseits hast du keine Kontrolle über den Partitionierer für DataFrames oder Datasets, sodass du Shuffles nicht manuell vermeiden kannst, wie du es bei Core Spark-Joins getan hast.

Datenrahmen Joins

Das Joinen von Daten zwischen DataFrames ist eine der häufigsten MultiDataFrame Transformationen. Die Standard-SQL-Join-Typen werden alle unterstützt und können als joinType in df.join(otherDf, sqlCondition, joinType) angegeben werden, wenn ein Join durchgeführt wird. Wie bei Joins zwischen RDDs führt das Joinen mit nicht eindeutigen Schlüsseln zu einem Kreuzprodukt (wenn also die linke Tabelle R1 und R2 mit Schlüssel 1 und die rechte Tabelle R3 und R5 mit Schlüssel 1 hat, erhältst du (R1, R3), (R1, R5), (R2, R3), (R2, R5)) in der Ausgabe. Während wir Spark SQL-Joins untersuchen, verwenden wir zwei Beispieltabellen von Pandas, Tabelle 4-1 und 4-2.

Warnung

Self-Joins werden zwar unterstützt, aber du musst den Feldern, an denen du interessiert bist, vorher andere Namen geben, damit du auf sie zugreifen kannst.

Tabelle 4-1. Tabelle der Pandas und Größen (unser linker Datenrahmen)
Name Größe

Glücklich

1.0

Traurig

0.9

Glücklich

1.5

Kaffee

3.0

Tabelle 4-2. Tabelle der Pandas und Postleitzahlen (unser rechter Datenrahmen)
Name Zip

Glücklich

94110

Glücklich

94103

Kaffee

10504

Tee

07012

Die von Spark unterstützten join Typen sind "inner", "left_outer" (alias "outer"), "left_anti", "right_outer", "full_outer" und "left_semi".3 Mit Ausnahme von "left_semi" verbinden alle diese Join-Typen die beiden Tabellen, aber sie verhalten sich unterschiedlich, wenn es um Zeilen geht, die nicht in beiden Tabellen Schlüssel haben.

Die "innere" Verknüpfung ist der Standard und wahrscheinlich das, woran du denkst, wenn du an die Verknüpfung von Tabellen denkst. Sie erfordert, dass der Schlüssel in beiden Tabellen vorhanden ist, sonst wird das Ergebnis verworfen, wie in Beispiel 4-6 und Tabelle 4-3 gezeigt.

Beispiel 4-6. Einfache innere Verknüpfung
    // Inner join implicit
    df1.join(df2, df1("name") === df2("name"))
    // Inner join explicit
    df1.join(df2, df1("name") === df2("name"), "inner")
Tabelle 4-3. Inner Join von df1, df2 auf Name
Name Größe Name Zip

Kaffee

3.0

Kaffee

10504

Glücklich

1.5

Glücklich

94110

Glücklich

1.5

Glücklich

94103

Glücklich

1.0

Glücklich

94110

Glücklich

1.0

Glücklich

94103

Left Outer Joins erzeugen eine Tabelle mit allen Schlüsseln aus der linken Tabelle, und alle Zeilen ohne passende Schlüssel in der rechten Tabelle haben Nullwerte in den Feldern, die von der rechten Tabelle ausgefüllt werden würden. Right Outer Joins funktionieren genauso, allerdings mit umgekehrten Voraussetzungen.Ein Beispiel für einen Left Outer Join findest du in Beispiel 4-7, das Ergebnis ist in Tabelle 4-4 dargestellt.

Beispiel 4-7. Linke äußere Verknüpfung
    // Left outer join explicit
    df1.join(df2, df1("name") === df2("name"), "left_outer")
Tabelle 4-4. Left outer join df1, df2 on name
Name Größe Name Zip

Traurig

0.9

null

null

Kaffee

3.0

Kaffee

10504

Glücklich

1.0

Glücklich

94110

Glücklich

1.0

Glücklich

94103

Glücklich

1.5

Glücklich

94110

Glücklich

1.5

Glücklich

94103

Ein Beispiel für einen Right Outer Join findest du in Beispiel 4-8, und. Das Ergebnis ist in Tabelle 4-5 dargestellt.

Beispiel 4-8. Rechter äußerer Join
    // Right outer join explicit
    df1.join(df2, df1("name") === df2("name"), "right_outer")
Tabelle 4-5. Right outer join df1, df2 on name
Name Größe Name Zip

Kaffee

3.0

Kaffee

10504

Glücklich

1.0

Glücklich

94110

Glücklich

1.0

Glücklich

94103

Glücklich

1.5

Glücklich

94110

Glücklich

1.5

Glücklich

94103

null

null

Tee

07012

Um alle Datensätze aus beiden Tabellen zu erhalten, kannst du den Full Outer Join verwenden, der zu Tabelle 4-6 führt.

Tabelle 4-6. Full outer join df1, df2 on name
Name Größe Name Zip

Traurig

0.9

null

null

Kaffee

3.0

Kaffee

10504

Glücklich

1.0

Glücklich

94110

Glücklich

1.0

Glücklich

94103

Glücklich

1.5

Glücklich

94110

Glücklich

1.5

Glücklich

94103

null

null

Tee

07012

Linke Semi-Joins (wie in Beispiel 4-9 und Tabelle 4-7) und linke Anti-Joins (wie in Tabelle 4-8) sind die einzigen Arten von Joins, die nur Werte aus der linken Tabelle enthalten.Ein linker Semi-Join ist dasselbe wie das Filtern der linken Tabelle nach nur Zeilen mit Schlüsseln, die in der rechten Tabelle vorhanden sind. Der linke Anti-Join liefert ebenfalls nur Daten aus der linken Tabelle, gibt aber stattdessen nur Datensätze zurück, die in der rechten Tabelle nicht vorhanden sind.

Beispiel 4-9. Linke Semi-Join
    // Left semi join explicit
    df1.join(df2, df1("name") === df2("name"), "left_semi")
Tabelle 4-7. Linke Semi-Fuge
Name Größe

Kaffee

3.0

Glücklich

1.0

Glücklich

1.5

Tabelle 4-8. Linke Anti-Join
Name Größe

Traurig

0.9

Selbst tritt bei

Self-Joins werden auf DataFrames unterstützt, aber wir erhalten doppelte Spaltennamen. Damit du auf die Ergebnisse zugreifen kannst, musst du die DataFrames auf verschiedene Namen aliasieren - sonst kannst du die Spalten aufgrund von Namenskollisionen nicht auswählen (siehe Beispiel 4-10). Sobald du jedes DataFrame aliasiert hast, kannst du im Ergebnis auf die einzelnen Spalten für jedes DataFrame mit dfName.colName zugreifen.

Beispiel 4-10. Selbst join
    val joined = df.as("a").join(df.as("b")).where($"a.name" === $"b.name")

Broadcast Hash Joins

In Spark SQL kannst du die Art der Verknüpfung sehen, indem du queryExecution.executedPlan aufrufst. Wenn eine der Tabellen viel kleiner ist als die andere, möchtest du vielleicht eine Broadcast-Hash-Verknüpfung. Du kannst Spark SQL mitteilen, dass eine bestimmte DF für die Verknüpfung Broadcast sein soll, indem du broadcast auf DataFrame aufrufst, bevor du sie verknüpfst (z. B. df1.join(broadcast(df2), "key")). Spark verwendet auch automatisch die spark.sql.conf.autoBroadcastJoinThreshold, um festzustellen, ob eine Tabelle broadcasted werden soll.

Dataset Joins

Die Verknüpfung von Datasets erfolgt mit joinWith und verhält sich ähnlich wie eine normale relationale Verknüpfung, außer dass das Ergebnis ein Tupel der verschiedenen Datensatztypen ist, wie in Beispiel 4-11 gezeigt. Dies ist etwas umständlicher, wenn man nach der Verknüpfung damit arbeitet, macht aber auch Self-Joins, wie in Beispiel 4-12 gezeigt, viel einfacher, da man die Spalten nicht erst mit Aliasen versehen muss.

Beispiel 4-11. Zwei Datensätze verbinden
    val result: Dataset[(RawPanda, CoffeeShop)] = pandas.joinWith(coffeeShops,
      $"zip" === $"zip")
Beispiel 4-12. Self join a Dataset
    val result: Dataset[(RawPanda, RawPanda)] = pandas.joinWith(pandas,
      $"zip" === $"zip")
Hinweis

Mit einem Self-Join und einem lit(true) kannst du das kartesische Produkt deiner Dataset erstellen. Das kann nützlich sein, zeigt aber auch, wie Joins (insbesondere Self-Joins) leicht zu unbrauchbaren Datengrößen führen können.

Wie bei DataFrames kannst du die Art der Verknüpfung angeben (z. B. inner, left_outer, right_outer, left_semi) und damit festlegen, wie Datensätze behandelt werden, die nur in einer Dataset vorhanden sind. Fehlende Datensätze werden durch Nullwerte dargestellt, also sei vorsichtig.

Fazit

Nachdem du dich nun mit Joins beschäftigt hast, ist es an der Zeit, dich mit Transformationen und den damit verbundenen Leistungsüberlegungen zu beschäftigen.

1 Wie das Sprichwort sagt, ist das Kreuzungsprodukt von Big Data und Big Data eine Out-of-Memory-Ausnahme.

2 Wenn die Anzahl der eindeutigen Schlüssel zu hoch ist, kannst du auch reduceByKey verwenden, nach dem Wert sortieren und das oberste k nehmen.

3 Die Anführungszeichen sind optional und können weggelassen werden. Wir verwenden sie in unseren Beispielen, weil wir denken, dass es mit den Anführungszeichen einfacher zu lesen ist.

Get Hochleistungsfunken now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.