Kapitel 4. Reduktionen in Spark

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

In diesem Kapitel geht es um Reduktionstransformationen auf RDDs in Spark. Insbesondere werden wir mit RDDs aus (Schlüssel, Wert)-Paaren arbeiten, die eine gängige Datenabstraktion sind, die für viele Operationen in Spark benötigt wird. Einige anfängliche ETL-Operationen können erforderlich sein, um deine Daten in eine (Schlüssel, Wert)-Form zu bringen, aber mit Paar-RDDs kannst du jede gewünschte Aggregation über eine Gruppe von Werten durchführen.

Spark unterstützt mehrere leistungsstarke Reduktionstransformationen und -aktionen. Die wichtigsten Reduktionstransformationen sind:

  • reduceByKey()

  • combineByKey()

  • groupByKey()

  • aggregateByKey()

Alle *ByKey() Transformationen akzeptieren eine QuelleRDD[(K, V)] und erstellen ein ZielRDD[(K, C)] (bei einigen Transformationen, wie reduceByKey(), V und Cist es dasselbe). Die Funktion dieser Transformationen besteht darin, alle Werte eines bestimmten Schlüssels (für alle eindeutigen Schlüssel) zu reduzieren, indem sie z. B. gefunden werden:

  • Der Durchschnitt aller Werte

  • Die Summe und Anzahl aller Werte

  • Der Modus und der Median aller Werte

  • Die Standardabweichung aller Werte

Auswahl der Reduktionstransformation

Wie bei den Mapper-Transformationen ist es wichtig, das richtige Werkzeug für die jeweilige Aufgabe auszuwählen. Für einige Reduktionsoperationen (z. B. die Ermittlung des Medians) benötigt der Reducer Zugriff auf alle Werte gleichzeitig. Für andere Operationen, wie das Ermitteln der Summe oder der Anzahl aller Werte, braucht er das nicht. Wenn du den Median der Werte pro Schlüssel finden willst, ist groupByKey()eine gute Wahl, aber diese Transformation funktioniert nicht gut, wenn ein Schlüssel viele Werte hat (was zu einem OOM-Problem führen kann). Wenn du hingegen die Summe oder die Anzahl aller Werte ermitteln willst, ist reduceByKey() eine gute Wahl: Sie fasst die Werte für jeden Schlüssel mithilfe einer assoziativen und kommutativen Reduktionsfunktion zusammen.

In diesem Kapitel lernst du anhand von einfachen PySpark-Beispielen, wie du die wichtigsten Spark-Reduktionstransformationen nutzen kannst. Wir werden uns auf die Transformationen konzentrieren, die am häufigsten in Spark-Anwendungen verwendet werden. Außerdem gehe ich auf das allgemeine Konzept der Reduktion und auf Monoide als Entwurfsprinzip für effiziente Reduktionsalgorithmen ein. Zu Beginn werden wir uns ansehen, wie man Paar-RDDs erstellt, die für die Reduktionstransformationen von Spark benötigt werden.

Paar-RDDs erstellen

Bei einer Menge von Schlüsseln und den dazugehörigen Werten reduziert eine Reduktionstransformation die Werte jedes Schlüssels mithilfe eines Algorithmus (Summe der Werte, Median der Werte usw.). Die in diesem Kapitel vorgestellten Reduktionstransformationen arbeiten also mit (Schlüssel, Wert)-Paaren, was bedeutet, dass die RDD-Elemente diesem Format entsprechen müssen. Es gibt verschiedene Möglichkeiten, Paar-RDDs in Spark zu erstellen. Zum Beispiel kannst duparallelize() auch für Sammlungen (wie Listen von Tupeln und Wörterbüchern) verwenden, wie hier gezeigt:

>>> key_value = [('A', 2), ('A', 4), ('B', 5), ('B', 7)]
>>> pair_rdd = spark.sparkContext.parallelize(key_value)
>>> pair_rdd.collect() 1
[('A', 2), ('A', 4), ('B', 5), ('B', 7)]
>>> pair_rdd.count()
4
>>> hashmap = pair_rdd.collectAsMap()
>>> hashmap
{'A': 4, 'B': 7}
1

pair_rdd hat zwei Tasten, {'A', 'B'}.

Nehmen wir an, du hast wetterbezogene Daten und möchtest Paare von (city_id, temperature) erstellen. Dazu kannst du die Transformation map() verwenden. Angenommen, deine Eingabe hat das folgende Format:

<city_id><,><latitude><,><longitude><,><temperature>

Definiere zunächst eine Funktion, um die gewünschten (Schlüssel-, Wert-)Paare zu erstellen:

def create_key_value(rec):
  tokens = rec.split(",")
  city_id = tokens[0]
  temperature = tokens[3]
  return (city_id, temperature) 1
1

Der Schlüssel ist city_id und der Wert ist temperature.

Verwende dann map(), um dein Paar-RDD zu erstellen:

input_path = <your-temperature-data-path>
rdd = spark.sparkContext.textFile(input_path)
pair_rdd = rdd.map(create_key_value)
# or you can write this using a lambda expression as:
# pair_rdd = rdd.map(lambda rec: create_key_value(rec))

Es gibt viele andere Möglichkeiten, (Schlüssel, Wert) Paar-RDDs zu erstellen: reduceByKey() Zum Beispiel nimmt RDD[(K, V)]eine Quelle an und erzeugt ein Ziel RDD[(K, V)], und combineByKey() nimmt eine Quelle RDD[(K, V)] an und erzeugt ein Ziel RDD[(K, C)].

Reduktion Transformationen

Bei einer Reduktionstransformation wird in der Regel die Datengröße von einer großen Menge von Werten (z. B. einer Liste von Zahlen) auf eine kleinere Menge reduziert. Beispiele für Reduktionen sind:

  • Summe und Durchschnitt aller Werte ermitteln

  • Ermitteln von Mittelwert, Modus und Median aller Werte

  • Berechnung von Mittelwert und Standardabweichung aller Werte

  • Finde die (min, max, count) aller Werte

  • Suche nach den Top 10 aller Werte

Kurz gesagt, entspricht eine Reduktionstransformation in etwa der Fold-Operation (auch Reduktion, Akkumulation oder Aggregation genannt) in der funktionalen Programmierung. Die Transformation wird entweder auf alle Datenelemente (z. B. beim Ermitteln der Summe aller Elemente) oder auf alle Elemente pro Schlüssel (z. B. beim Ermitteln der Summe aller Elemente pro Schlüssel) angewendet.

Eine einfache Additionsreduktion über eine Menge von Zahlen {47, 11, 42, 13} für eine einzelne Partition ist in Abbildung 4-1 dargestellt.

daws 0401
Abbildung 4-1. Eine Additionsreduzierung in einer einzelnen Partition

Abbildung 4-2 zeigt eine Reduktion, bei der die Elemente von zwei Partitionen summiert werden. Die endgültigen reduzierten Werte für Partition-1 und Partition-2 sind 21 und 18. Jede Partition führt lokale Reduktionen durch und schließlich werden die Ergebnisse der beiden Partitionen reduziert.

daws 0402
Abbildung 4-2. Eine Additionsreduktion über zwei Partitionen

Der Reducer ist ein zentrales Konzept in der funktionalen Programmierung, das verwendet wird, um eine Menge von Objekten (z. B. Zahlen, Strings oder Listen) in einen einzigen Wert umzuwandeln (z. B. die Summe von Zahlen oder die Verkettung von String-Objekten). Spark und das MapReduce-Paradigma nutzen dieses Konzept, um eine Menge von Werten zu einem einzigen Wert pro Schlüssel zu aggregieren. Betrachte die folgenden (Schlüssel, Wert) Paare, bei denen der Schlüssel eineString und der Wert eine Liste von Integerist:

(key1, [1, 2, 3])
(key2, [40, 50, 60, 70, 80])
(key3, [8])

Der einfachste Reducer ist eine Additionsfunktion über eine Menge von Werten pro Schlüssel. Nachdem wir diese Funktion angewendet haben, lautet das Ergebnis:

(key1, 6)
(key2, 300)
(key3, 8)

Oder du kannst jedes (Schlüssel, Wert) auf (Schlüssel, Paar) reduzieren, wobei das Paar(sum-of-values, count-of-values) ist:

(key1, (6, 3))
(key2, (300, 5))
(key3, (8, 1))

Reducer sind so konzipiert, dass sie gleichzeitig und unabhängig voneinander arbeiten, d.h. es gibt keine Synchronisation zwischen den Reducern. Je mehr Ressourcen ein Spark-Cluster hat, desto schneller können Reduktionen durchgeführt werden. Im schlimmsten Fall, wenn wir nur einen Reducer haben, funktioniert die Reduktion als Warteschlangenoperation. In der Regel bietet ein Cluster viele Reducer (je nach Ressourcenverfügbarkeit) für die Reduktionsumwandlung.

In MapReduce und verteilten Algorithmen ist die Reduktion eine notwendige Operation, um ein Problem zu lösen. Im MapReduce-Programmierparadigma definiert der Programmierer einen Mapper und einen Reducer mit den folgenden map()und reduce() Signaturen (beachte, dass [] eine Iterable bezeichnet):

map()

(K1, V1) → [(K2, V2)]

reduce()

(K2, [V2]) → [(K3, V3)]

Die Funktion map() bildet ein Paar (Schlüssel1, Wert1) in eine Menge von Paaren (Schlüssel2, Wert2) ab. Nachdem alle Map-Operationen abgeschlossen sind, erfolgt das Sortieren und Mischen automatisch (diese Funktion wird vom MapReduce-Paradigma bereitgestellt und nicht vom Programmierer implementiert). Die MapReduce-Sortier- und Shuffle-Phase ist der Transformation von Spark groupByKey()sehr ähnlich.

Die Funktion reduce() reduziert ein (Schlüssel2, [Wert2]) Paar in eine Menge von (Schlüssel3, Wert3) Paaren. Die Konvention wird verwendet, um eine Liste von Objekten (oder eine iterierbare Liste von Objekten) zu bezeichnen. Wir können also sagen, dass eine Reduktionstransformation eine Liste von Werten nimmt und sie auf ein greifbares Ergebnis reduziert (z. B. die Summe von Werten, den Durchschnitt von Werten oder deine gewünschte Datenstruktur).

Spark's Ermäßigungen

Spark bietet eine Vielzahl von einfach zu verwendenden Reduktions-Transformationen. Wie zu Beginn dieses Kapitels erwähnt, liegt unser Schwerpunkt auf der Reduktion von Paar-RDDs. Daher gehen wir davon aus, dass jedes RDD eine Menge von Schlüsseln und für jeden Schlüssel (z. B. K) eine Menge von Werten hat:

{ (K, V1), (K, V2), ..., (K, Vn) }

Tabelle 4-1 listet die in Spark verfügbaren Reduktionstransformationen auf.

Tabelle 4-1. Spark's Reduktions-Transformationen
Transformation Beschreibung

aggregateByKey()

Aggregiert die Werte jedes Schlüssels mit den angegebenen Kombinationsfunktionen und einem neutralen "Nullwert".

combineByKey()

Generische Funktion zum Kombinieren der Elemente für jeden Schlüssel unter Verwendung eines benutzerdefinierten Satzes von Aggregationsfunktionen

countByKey()

Zählt die Anzahl der Elemente für jeden Schlüssel und gibt das Ergebnis in Form eines Wörterbuchs an den Master zurück

foldByKey()

Führt die Werte für jeden Schlüssel mit einer assoziativen Funktion und einem neutralen "Nullwert" zusammen

groupByKey()

Fasst die Werte für jeden Schlüssel im RDD in einer einzigen Sequenz zusammen

reduceByKey()

Führt die Werte für jeden Schlüssel mithilfe einer assoziativen und kommutativen Reduktionsfunktion zusammen

sampleByKey()

Gibt eine Teilmenge dieses RDD zurück, die nach Schlüssel abgetastet wurde, wobei die Abtastraten für die verschiedenen Schlüssel durch Brüche festgelegt werden

sortByKey()

Sortiert das RDD nach Schlüssel, so dass jede Partition einen sortierten Bereich der Elemente in aufsteigender Reihenfolge enthält

Diese Transformationsfunktionen wirken alle auf (Schlüssel, Wert) Paare, die durch RDDs dargestellt werden. In diesem Kapitel befassen wir uns nur mit der Reduktion von Daten über eine Reihe von eindeutigen Schlüsseln. Nehmen wir zum Beispiel die folgenden (Schlüssel-, Wert-)Paare für den Schlüssel K:

{ (K, V1), (K, V2), ..., (K, Vn) }

gehen wir davon aus, dass K eine Liste mit n (> 0) Werten hat:

[ V1, V2, ..., Vn ]

Um es einfach zu halten, besteht das Ziel der Reduktion darin, das folgende Paar (oder die folgende Menge von Paaren) zu erzeugen:

(K, R)

wo:

f(V1, V2, ..., Vn) -> R

Die Funktion f() wirdReducer oder Reduktionsfunktion genannt. Die Reduktionstransformationen von Spark wenden diese Funktion auf eine Liste von Werten an, um den reduzierten Wert R zu finden. Beachte, dass Spark keine Reihenfolge der zu reduzierenden Werte vorgibt ([V1, V2, ..., Vn]), die reduziert werden sollen.

Dieses Kapitel enthält praktische Lösungsbeispiele, die die Verwendung der gängigsten Reduktionstransformationen von Spark demonstrieren: reduceByKey(), groupByKey(), aggregateByKey() und combineByKey(). Als Einstieg sehen wir uns ein sehr einfaches Beispiel für die Transformation groupByKey() an. Wie das Beispiel in Abbildung 4-3 zeigt, funktioniert sie ähnlich wie die SQL-Anweisung GROUP BY. In diesem Beispiel haben wir vier Schlüssel, {A, B, C, P}, und die zugehörigen Werte sind als Liste von Ganzzahlen gruppiert. Das Quell-RDD ist ein RDD[(String, Integer)], bei dem jedes Element ein Paar von (String, Integer) ist. Das Ziel-RDD ist ein RDD[(String, [Integer])], bei dem jedes Element ein Paar von (String, [Integer]) ist; der Wert ist eine iterierbare Liste von Ganzzahlen.

daws 0403
Abbildung 4-3. Die groupByKey() Transformation
Hinweis

Standardmäßig werden die reduzierten Werte bei der Spark-Reduktion nicht sortiert. In Abbildung 4-3 könnte der reduzierte Wert für den Schlüssel B zum Beispiel [4, 8] oder [8, 4] lauten. Falls gewünscht, kannst du die Werte vor der endgültigen Reduktion sortieren. Wenn dein Reduktionsalgorithmus eine Sortierung erfordert, musst du die Werte explizit sortieren.

Nachdem du nun ein allgemeines Verständnis davon hast, wie Reducer funktionieren, wollen wir uns einem praktischen Beispiel zuwenden, das zeigt, wie verschiedene Spark-Reduktionstransformationen zur Lösung eines Datenproblems verwendet werden können.

Einfaches Warmup Beispiel

Angenommen, wir haben eine Liste von Paaren (K, V), wobei K (der Schlüssel) ein String und V (der Wert) ein Integer ist:

[
 ('alex', 2), ('alex', 4), ('alex', 8),
 ('jane', 3), ('jane', 7),
 ('rafa', 1), ('rafa', 3), ('rafa', 5), ('rafa', 6),
 ('clint', 9)
]

In diesem Beispiel haben wir vier eindeutige Schlüssel:

{ 'alex', 'jane', 'rafa', 'clint' }

Angenommen, wir wollen die Werte pro Schlüssel kombinieren (summieren). Das Ergebnis dieser Reduktion wird sein:

[
 ('alex', 14),
 ('jane', 10),
 ('rafa', 15),
 ('clint', 9)
]

wo:

key: alex =>    14 = 2+4+8
key: jane =>    10 = 3+7
key: rafa =>    15 = 1+3+5+6
key: clint =>    9 (single value, no operation is done)

Es gibt viele Möglichkeiten, diese Zahlen zu addieren, um das gewünschte Ergebnis zu erhalten. Wie sind wir zu diesen reduzierten (Schlüssel, Wert) Paaren gekommen? Für dieses Beispiel können wir jede der üblichen Spark-Transformationen verwenden. Das Aggregieren oder Kombinieren der Werte pro Schlüssel ist eine Art der Reduktion - im klassischen MapReduce-Paradigma wird dies als reduce by key(oder einfach reduce) Funktion bezeichnet. Das MapReduce-Framework ruft die (benutzerdefinierte) Reduktionsfunktion der Anwendung einmal für jeden einzelnen Schlüssel auf. Die Funktion durchläuft die Werte, die mit diesem Schlüssel verknüpft sind, und erzeugt null oder mehr Ausgaben als (Schlüssel, Wert)-Paare und löst so das Problem, die Elemente jedes eindeutigen Schlüssels zu einem einzigen Wert zu kombinieren. (Beachte, dass das Ergebnis in manchen Anwendungen mehr als ein einziger Wert sein kann).

Hier stelle ich vier verschiedene Lösungen vor, die die Transformationen von Spark nutzen. Für alle Lösungen werden wir die folgenden Python data undkey_value_pairs RDD verwenden:

>>> data = 1
[
 ('alex', 2), ('alex', 4), ('alex', 8),
 ('jane', 3), ('jane', 7),
 ('rafa', 1), ('rafa', 3), ('rafa', 5), ('rafa', 6),
 ('clint', 9)
]
>>> key_value_pairs = spark.SparkContext.parallelize(data) 2
>>> key_value_pairs.collect()
[
 ('alex', 2), ('alex', 4), ('alex', 8),
 ('jane', 3), ('jane', 7),
 ('rafa', 1), ('rafa', 3), ('rafa', 5), ('rafa', 6),
 ('clint', 9)
]
1

data ist eine Python-Sammlung - eine Liste von (Schlüssel, Wert) Paaren.

2

key_value_pairs ist eine RDD[(String, Integer)].

Lösen mit reduceByKey()

Das Summieren der Werte für einen bestimmten Schlüssel ist ziemlich einfach: Addiere die ersten beiden Werte, dann den nächsten und so weiter. Sparks reduceByKey() Transformation führt die Werte für jeden Schlüssel mithilfe einer assoziativen und kommutativen Reduktionsfunktion zusammen. Kombinierer (optimierte Mini-Reduzierer) werden in allen Clusterknoten verwendet, bevor die Werte pro Partition zusammengeführt werden.

Bei der Transformation reduceByKey() ist das Quell-RDD ein RDD[(K, V)] und das Ziel-RDD ein RDD[(K, V)]. Beachte, dass die Quell- und Zieldatentypen der RDD-Werte (V) gleich sind. Dies ist eine Einschränkung von reduceByKey(), die durch die Verwendung voncombineByKey() oder aggregateByKey() vermieden werden kann.

Wir können die reduceByKey() Transformation mit einem Lambda-Ausdruck (anonyme Funktion) anwenden:

# a is (an accumulated) value for key=K
# b is a value for key=K
sum_per_key = key_value_pairs.reduceByKey(lambda a, b: a+b)
sum_per_key.collect()
[('jane', 10), ('rafa', 15), ('alex', 14), ('clint', 9)]

Alternativ können wir auch eine definierte Funktion verwenden, z. B. add:

from operator import add
sum_per_key = key_value_pairs.reduceByKey(add)
sum_per_key.collect()
[('jane', 10), ('rafa', 15), ('alex', 14), ('clint', 9)]

Das Hinzufügen von Werten pro Schlüssel durch reduceByKey() ist eine optimierte Lösung, da die Aggregation auf Partitionsebene vor der endgültigen Aggregation aller Partitionen erfolgt.

Lösen mit groupByKey()

Wir können dieses Problem auch mit der groupByKey()Transformation lösen, aber diese Lösung wird nicht so gut funktionieren, weil dabei viele Daten zu den Reducer-Knoten verschoben werden müssen (du wirst mehr darüber erfahren, warum das der Fall ist, wenn wir später in diesem Kapitel den Shuffle-Schritt besprechen).

Bei der Transformation reduceByKey() ist das Quell-RDD ein RDD[(K, V)] und das Ziel-RDD ein RDD[(K, [V])]. Beachte, dass die Quell- und Zieldatentypen nicht identisch sind: Der Wertdatentyp für das Quell-RDD ist V, während er für das Ziel-RDD [V] ist (eine Iterable/Liste von Vs).

Das folgende Beispiel demonstriert die Verwendung von groupByKey() mit einem Lambda-Ausdruck, um die Werte pro Schlüssel zu summieren:

sum_per_key = key_value_pairs
                 .grouByKey() 1
                 .mapValues(lambda values: sum(values)) 2
sum_per_key.collect()
[('jane', 10), ('rafa', 15), ('alex', 14), ('clint', 9)]
1

Werte pro Schlüssel gruppieren (ähnlich wie bei SQL GROUP BY). Jetzt hat jeder Schlüssel einen Satz von IntegerWerten; zum Beispiel werden die drei Paare{('alex', 2), ('alex', 4), ('alex', 8)}auf ein einziges Paar, ('alex', [2, 4, 8]), reduziert.

2

Füge mit der Python-Funktion sum() Werte pro Schlüssel hinzu.

Lösen mit aggregateByKey()

In der einfachsten Form ist die aggregateByKey()Transformation definiert als:

aggregateByKey(zero_value, seq_func, comb_func)

source RDD: RDD[(K, V)]
target RDD: RDD[(K, C))

Sie fasst die Werte jedes Schlüssels aus dem Quell-RDD in einem Ziel-RDD zusammen, wobei die angegebenen Kombinationsfunktionen und ein neutraler "Nullwert" (der für jede Partition verwendete Anfangswert) verwendet werden. Diese Funktion kann einen anderen Ergebnistyp (C) zurückgeben als den Typ derWerte im Quell-RDD (V), obwohl in diesem Beispiel beide Datentypen Integer sind. Wir brauchen also eine Operation für die Zusammenführung von Werten innerhalb einer einzelnen Partition (Zusammenführung von Werten des Typs V in einen Wert des Typs C) und eine Operation für die Zusammenführung von Werten zwischen Partitionen (Zusammenführung von Werten des Typs C aus mehreren Partitionen). Um eine unnötige Speicherzuweisung zu vermeiden, dürfen diese beiden Funktionen ihr erstes Argument ändern und zurückgeben, anstatt ein neues C zu erstellen.

Das folgende Beispiel zeigt die Verwendung derTransformation aggregateByKey():

# zero_value -> C
# seq_func: (C, V) -> C
# comb_func: (C, C) -> C

>>> sum_per_key = key_value_pairs.aggregateByKey(
... 0, 1
... (lambda C, V: C+V), 2
... (lambda C1, C2: C1+C2) 3
... )
>>> sum_per_key.collect()
[('jane', 10), ('rafa', 15), ('alex', 14), ('clint', 9)]
1

Die zero_value, die auf jede Partition angewendet wird, ist 0.

2

seq_func auf einer einzelnen Partition verwendet wird.

3

comb_func wird verwendet, um die Werte der Partitionen zu kombinieren.

Lösen mit combineByKey()

Die combineByKey() Transformation ist die allgemeinste und mächtigste der Reduktionstransformationen von Spark. In ihrer einfachsten Form ist sie definiert als:

combineByKey(create_combiner, merge_value, merge_combiners)

source RDD: RDD[(K, V)]
target RDD: RDD[(K, C))

Wie aggregateByKey() verwandelt auch die Transformation combineByKey() eine QuelleRDD[(K, V)] in ein Ziel RDD[(K, C)]. Auch hier können V und Cunterschiedliche Datentypen sein (das ist ein Teil der Stärke von combineByKey()- zum Beispiel kann V ein String oder Integer sein, während C eine Liste, ein Tupel oder ein Wörterbuch sein kann), aber in diesem Beispiel sind beide Integer Datentypen.

Die Schnittstelle combineByKey() ermöglicht es uns, das Verhalten der Reduktion und Kombination sowie den Datentyp anzupassen. Um diese Transformation zu nutzen, müssen wir also drei Funktionen bereitstellen:

create_combiner

Diese Funktion verwandelt ein einzelnes V in ein C (z.B. die Erstellung einer Ein-Element-Liste). Sie wird innerhalb einer einzelnen Partition verwendet, um ein C zu initialisieren.

merge_value

Diese Funktion fügt eine Vin eine C ein (z.B. indem sie am Ende einer Liste angehängt wird). Sie wird innerhalb einer einzelnen Partition verwendet, um Werte zu einer C zusammenzufassen.

merge_combiners

Diese Funktion kombiniert zwei Cs zu einer einzigen C(z.B. Zusammenführen der Listen). Sie wird beim Zusammenführen von Werten aus zwei Partitionen verwendet.

Unsere Lösung mit combineByKey() sieht so aus:

>>> sum_per_key = key_value_pairs.combineByKey(
...           (lambda v: v), 1
...           (lambda C,v: C+v), 2
...           (lambda C1,C2: C1+C2) 3
... )
>>> sum_per_key.collect()
[('jane', 10), ('rafa', 15), ('alex', 14), ('clint', 9)]
1

create_combiner erstellt die Anfangswerte in jeder Partition.

2

merge_value führt die Werte in einer Partition zusammen.

3

merge_combiners fügt die Werte aus den verschiedenen Partitionen zu einem Endergebnis zusammen.

Um dir eine bessere Vorstellung von der Leistungsfähigkeit der combineByKey()Transformation zu geben, schauen wir uns ein anderes Beispiel an. Angenommen, wir wollen den Mittelwert der Werte pro Schlüssel ermitteln. Zu diesem Zweck können wir einen kombinierten Datentyp (C) als (sum, count) erstellen, der die Summen der Werte und die dazugehörigen Zählungen enthält:

# C = combined type as (sum, count)
>>> sum_count_per_key = key_value_pairs.combineByKey(
...           (lambda v: (v, 1)),
...           (lambda C,v: (C[0]+v, C[1]+1),
...           (lambda C1,C2: (C1[0]+C2[0], C1[1]+C2[1]))
... )
>>> mean_per_key = sum_count_per_key.mapValues(lambda C: C[0]/C[1])

Anhand von drei Partitionen mit dem Namen {P1, P2, P3} zeigtAbbildung 4-4, wie ein Kombinierer (Datentyp C) erstellt wird, wie ein Wert in einen Kombinierer eingefügt wird und wie schließlich zwei Kombinierer zusammengefügt werden.

daws 0404
Abbildung 4-4. combineByKey() Transformationsbeispiel

Als Nächstes werde ich das Konzept der Monoide erläutern, das dir helfen wird zu verstehen, wie Kombinatoren bei Reduktionstransformationen funktionieren.

Was ist ein Monoid?

Monoide sind ein nützliches Konstruktionsprinzip, um effiziente MapReduce-Algorithmen zu schreiben.1 Wenn du Monoide nicht verstehst, kannst du Reducer-Algorithmen schreiben, die keine semantisch korrekten Ergebnisse liefern. Wenn dein Reducer ein Monoid ist, kannst du sicher sein, dass er in einer verteilten Umgebung korrekte Ergebnisse liefert.

Da Spark-Reduktionen auf Partitionsebene ausgeführt werden (d. h. deine Reduktionsfunktion ist verteilt und keine sequentielle Funktion), musst du sicherstellen, dass deine Reduktionsfunktion semantisch korrekt ist, um die richtige Ausgabe zu erhalten. Wir werden uns in Kürze einige Beispiele für die Verwendung von Monoiden ansehen, aber zuerst wollen wir uns das zugrunde liegende mathematische Konzept ansehen.

In der Algebra ist ein Monoid eine algebraische Struktur mit einer einzigen assoziativen binären Operation und einem Identitätselement (auch Null-Element genannt).

Für unsere Zwecke können wir ein Monoid informell definieren als M = (T, f, Zero), wobei:

  • T ist ein Datentyp.

  • f() ist eine binäre Operation: f: (T, T) -> T.

  • Zero ist eine Instanz von T.

Hinweis

Zero ist ein (neutrales) Identitätselement vom Typ T; dies ist nicht unbedingt die Zahl Null.

Wenn a, b, c und Zero vom Typ T sind, müssen die folgenden Eigenschaften erfüllt sein, damit das Tripel (T, f, Zero) ein Monoid ist:

  • Binäre Operation

    f: (T, T) -> T
  • Neutrales Element

    for all a in T:
    
    f(Zero, a) = a
    f(a, Zero) = a
  • Assoziativität

    for all a, b, c in T:
    
    f(f(a, b), c) = f(a, f(b, c))

Nicht jede binäre Operation ist ein Monoid. Zum Beispiel ist die Funktion mean() über einer Menge von ganzen Zahlen keine assoziative Funktion und daher kein Monoid, wie der folgende Beweis zeigt:

mean(10, mean(30, 50)) != mean(mean(10, 30), 50)

where

   mean(10, mean(30, 50))
      = mean (10, 40)
      = 25

   mean(mean(10, 30), 50)
      = mean (20, 50)
      = 35

   25 != 35

Was bedeutet das? Bei einerRDD[(String, Integer)] könnten wir versucht sein, die folgende Transformation zu schreiben, um einen Durchschnitt pro Schlüssel zu finden:

# rdd: RDD[(String, Integer)]
# WRONG REDUCTION to find average by key
avg_by_key = rdd.reduceByKey(lambda x, y: (x+y)/2)

Das führt aber nicht zu den richtigen Ergebnissen, denn der Durchschnitt der Durchschnitte ist kein Durchschnitt - mit anderen Worten: Die hier verwendete Mittelwert-/Durchschnittsfunktion ist kein Monoid. Nehmen wir an, dass rdd drei Elemente hat:{("A", 1), ("A", 2), ("A", 3)}; {("A", 1), ("A", 2)} sind in Partition 1 und {("A", 3)}ist in Partition 2. Wenn du die obige Lösung anwendest, erhältst du die aggregierten Werte ("A", 1.5) für Partition 1 und ("A", 3.0) für Partition 2. Wenn du die Ergebnisse für die beiden Partitionen kombinierst, erhalten wir einen endgültigen Durchschnitt von (1,5 + 3,0) / 2 = 2,25, was nicht das richtige Ergebnis ist (der Durchschnitt der drei Werte ist 2,0). Wenn dein Reducer ein Monoid ist, verhält er sich garantiert richtig und liefert korrekte Ergebnisse.

Monoide und nicht-monoide Beispiele

Um dir zu helfen, Monoide zu verstehen und zu erkennen, schauen wir uns einige monoide und nicht-monoide Beispiele an. Im Folgenden sind Beispiele für Monoide aufgeführt:

  • Ganze Zahlen mit Addition:

    ((a + b ) + c) = (a + (b + c))
    0 + n = n
    n + 0 = n
    The zero element for addition is the number 0.
  • Ganzzahlige Zahlen mit Multiplikation:

    ((a * b) * c) = (a * (b * c))
    1 * n = n
    n * 1 = n
    The zero element for multiplication is the number 1.
  • Strings mit Verkettung:

    (a + (b + c)) = ((a + b) + c)
    "" + s = s
    s + "" = s
    The zero element for concatenation is an empty string of size 0.
  • Listen mit Verkettung:

    List(a, b) + List(c, d) = List(a,b,c,d)
  • Sets mit ihrer Vereinigung:

    Set(1,2,3) + Set(2,4,5)
       = Set(1,2,3,2,4,5)
       = Set(1,2,3,4,5)
    
    S + {} = S
    {} + S = S
    The zero element is an empty set {}.

Und hier sind einige nicht-monoide Beispiele:

  • Ganzzahlen mit Mittelwertfunktion:

    mean(mean(a,b),c) != mean(a, mean(b,c))
  • Ganzzahlige Zahlen mit Subtraktion:

    ((a - b) -c) != (a - (b - c))
  • Ganzzahlige Zahlen mit Division:

    ((a / b) / c) != (a / (b / c))
  • Ganzzahlen mit Modusfunktion:

    mode(mode(a, b), c) != mode(a, mode(b, c))
  • Ganzzahlen mit Medianfunktion:

    median(median(a, b), c) != median(a, median(b, c))

In manchen Fällen ist es möglich, ein Nicht-Monoid in ein Monoid umzuwandeln. Zum Beispiel können wir mit einer einfachen Änderung unserer Datenstrukturen den korrekten Mittelwert einer Zahlenmenge ermitteln. Es gibt jedoch keinen Algorithmus, der eine nicht-monoide Struktur automatisch in ein Monoid umwandelt.

Das Schreiben verteilter Algorithmen in Spark unterscheidet sich stark vom Schreiben sequenzieller Algorithmen auf einem einzelnen Server, da die Algorithmen parallel auf partitionierten Daten arbeiten. Wenn du einen Reducer schreibst, musst du daher sicherstellen, dass deine Reduktionsfunktion ein Monoid ist. Nachdem du nun dieses wichtige Konzept verstanden hast, wollen wir uns einigen praktischen Beispielen zuwenden.

Das Filmproblem

Das Ziel dieses ersten Beispiels ist es, ein grundlegendes Problem vorzustellen und dann Lösungen mit verschiedenen Spark-Reduktionstransformationen mithilfe von PySpark anzubieten. Für alle Reduktionstransformationen habe ich die Datentypen sorgfältig so ausgewählt, dass sie ein Monoid bilden.

Das Filmproblem lässt sich wie folgt formulieren: Bei einer Menge von Nutzern, Filmen und Bewertungen (im Bereich von 1 bis 5) wollen wir die durchschnittliche Bewertung aller Filme durch einen Nutzer finden. Wenn also der Nutzer mit userID=100 vier Filme bewertet hat:

(100, "Lion King", 4.0)
(100, "Crash", 3.0)
(100, "Dead Man Walking", 3.5)
(100, "The Godfather", 4.5)

wollen wir die folgende Ausgabe erzeugen:

(100, 3.75)

wo:

3.75 = mean(4.0, 3.0, 3.5, 4.5)
     = (4.0 + 3.0 + 3.5 + 4.5) / 4
     = 15.0 / 4

Für dieses Beispiel ist zu beachten, dass die reduceByKey()Transformation über eine Menge von Bewertungen nicht immer das richtige Ergebnis liefert, da der Durchschnitt (oder Mittelwert) kein algebraisches Monoid über eine Menge von Fließkommazahlen/Ganzzahlen ist. Mit anderen Worten: Wie im vorherigen Abschnitt erläutert, ist der Mittelwert der Mittelwerte nicht gleich dem Mittelwert aller eingegebenen Zahlen.Hier ist ein einfacher Beweis. Angenommen, wir wollen den Mittelwert von sechs Werten (die Zahlen 1-6) finden, die in einer einzigen Partition gespeichert sind. Das können wir mit der Funktion mean() wie folgt erreichen:

mean(1, 2, 3, 4, 5, 6)
   = (1 + 2 + 3 + 4 + 5 + 6) / 6
   = 21 / 6
   = 3.5 [correct result]

Jetzt wollen wir mean() als verteilte Funktion einrichten. Angenommen, die Werte werden auf drei Partitionen gespeichert:

Partition-1: (1, 2, 3)
Partition-2: (4, 5)
Partition-3: (6)

Zuerst berechnen wir den Mittelwert jeder Partition:

mean(1, 2, 3, 4, 5, 6)
  =  mean (
           mean(Partition-1),
           mean(Partition-2),
           mean(Partition-3)
          )

mean(Partition-1)
  = mean(1, 2, 3)
  = mean( mean(1,2), 3)
  = mean( (1+2)/2, 3)
  = mean(1.5, 3)
  = (1.5+3)/2
  = 2.25

mean(Partition-2)
  = mean(4,5)
  = (4+5)/2
  = 4.5

mean(Partition-3)
  = mean(6)
  = 6

Dann ermitteln wir den Mittelwert dieser Werte. Sobald alle Partitionen verarbeitet sind, erhalten wir also:

mean(1, 2, 3, 4, 5, 6)
  =  mean (
           mean(Partition-1),
           mean(Partition-2),
           mean(Partition-3)
          )
  = mean(2.25, 4.5, 6)
  = mean(mean(2.25, 4.5), 6)
  = mean((2.25 + 4.5)/2, 6)
  = mean(3.375, 6)
  = (3.375 + 6)/2
  = 9.375 / 2
  = 4.6875  [incorrect result]

Um dieses Problem zu vermeiden, können wir eine monoide Datenstruktur (die Assoziativität und Kommutativität unterstützt) verwenden, z. B. ein Paar (sum, count), wobei sum die Gesamtsumme aller Zahlen ist, die wir bisher gesehen haben (pro Partition) und count die Anzahl der Bewertungen, die wir bisher gesehen haben. Wenn wir unsere mean() Funktion wie folgt definieren:

mean(pair(sum, count)) = sum / count

erhalten wir:

mean(1,2,3,4,5,6)
  = mean(mean(1,2,3), mean(4,5), mean(6))
  = mean(pair(1+2+3, 1+1+1), pair(4+5, 1+1), pair(6,1))
  = mean(pair(6, 3), pair(9, 2), pair(6,1))
  = mean(mean(pair(6, 3), pair(9, 2)), pair(6,1))
  = mean(pair(6+9, 3+2), pair(6,1))
  = mean(pair(15, 5), pair(6,1))
  = mean(pair(15+6, 5+1))
  = mean(pair(21, 6))
  = 21 / 6 = 3.5 [correct result]

Wie dieses Beispiel zeigt, können wir durch die Verwendung eines Monoids Assoziativität erreichen. Daher kannst du die reduceByKey() Transformation anwenden, wenn deine Funktion f() kommutativ und assoziativ ist:

# a = (sum1, count1)
# b = (sum2, count2)
# f(a, b) = a + b
#         = (sum1+sum2, count1+count2)
#
reduceByKey(lambda a, b: f(a, b))

Zum Beispiel ist die Addition (+) kommutativ und assoziativ, aber die Mittelwertfunktion erfüllt diese Eigenschaften nicht.

Hinweis

Wie wir in Kapitel 1 gesehen haben, stellt eine kommutative Funktion sicher, dass das Ergebnis unabhängig von der Reihenfolge der Elemente in dem zu aggregierenden RDD ist:

f(A, B) = f(B, A)

Eine assoziative Funktion stellt sicher, dass die Reihenfolge, in der die Elemente während der Aggregation gruppiert werden, keinen Einfluss auf das Endergebnis hat:

f(f(A, B), C) = f(A, f(B, C))

Zu analysierender Eingabedatensatz

Die Beispieldaten, die wir für dieses Problem verwenden werden, sind ein Datensatz vonMovieLens. Der Einfachheit halber gehe ich davon aus, dass du die Dateien heruntergeladen und in das Verzeichnis /tmp/movielens/ entpackt hast. Du musst die Dateien nicht unbedingt an dem vorgeschlagenen Ort ablegen. Du kannst deine Dateien in deinem bevorzugten Verzeichnis ablegen und deine Eingabepfade entsprechend anpassen.

Tipp

Der vollständige MovieLens-Datensatz(ml-latest.zip) ist 265 MB groß. Wenn du einen kleineren Datensatz zum Ausführen, Testen und Debuggen der hier aufgelisteten Programme verwenden möchtest, kannst du stattdessen den kleinen MovieLens-Datensatz herunterladen, eine 1 MB große Datei mit 100.000 Bewertungen und 3.600 Tag-Anwendungen, die von 600 Nutzern auf 9.000 Filme angewendet wurden.

Alle Bewertungen sind in der Dateiratings.csv enthalten. Jede Zeile dieser Datei nach der Kopfzeile steht für eine Bewertung eines Films durch einen Nutzer und hat das folgende Format:

<userId><,><movieId><,><rating><,><timestamp>

In dieser Datei:

  • Die Zeilen sind zunächst nach userId und dann für jeden Nutzer nach movieId geordnet.

  • Die Bewertung erfolgt auf einer 5-Sterne-Skala mit Abstufungen von einem halben Stern (0,5 Sterne bis 5,0 Sterne).

  • Die Zeitstempel stellen die Sekunden seit Mitternacht koordinierter Weltzeit (UTC) am 1. Januar 1970 dar (dieses Feld wird in unserer Analyse ignoriert).

Nachdem du die heruntergeladene Datei entpackt hast, solltest du die folgenden Dateien haben:

$ ls -l /tmp/movielens/
       8,305  README.txt
     725,770  links.csv
   1,729,811  movies.csv
 620,204,630  ratings.csv
  21,094,823  tags.csv

Überprüfe zunächst die Anzahl der Datensätze (je nachdem, wann du die Datei heruntergeladen hast, kann die Anzahl der angezeigten Datensätze unterschiedlich sein):

$ wc -l /tmp/movielens/ratings.csv
22,884,378 /tmp/movielens/ratings.csv

Als Nächstes wirfst du einen Blick auf die ersten paar Datensätze:

$ head -6 /tmp/movielens/ratings.csv
userId,movieId,rating,timestamp
1,169,2.5,1204927694
1,2471,3.0,1204927438
1,48516,5.0,1204927435
2,2571,3.5,1436165433
2,109487,4.0,1436165496

Da wir RDDs verwenden, brauchen wir die mit den Daten verbundenen Metadaten nicht. Daher können wir die erste Zeile (die Kopfzeile) aus der Dateiratings.csv entfernen:

$ tail -n +2 ratings.csv > ratings.csv.no.header
$ wc -l ratings.csv ratings.csv.no.header
22,884,378 ratings.csv
22,884,377 ratings.csv.no.header

Jetzt, wo wir unsere Beispieldaten haben, können wir ein paar Lösungen für dieses Problem durchspielen. Die erste Lösung wird aggregateByKey() verwenden, aber bevor wir dazu kommen, werde ich die Logik hinter dieser Transformation vorstellen.

Die aggregateByKey()-Transformation

Die aggregateByKey() Transformation von Spark initialisiert jeden Schlüssel in jeder Partition mit dem Nullwert, der ein anfänglicher kombinierter Datentyp ist (C); dies ist ein neutraler Wert, typischerweise (0, 0), wenn der kombinierte Datentyp (sum, count) ist. Dieser Nullwert wird mit dem ersten Wert in der Partition zusammengeführt, um einen neuen C zu erstellen, der dann mit dem zweiten Wert zusammengeführt wird. Dieser Prozess wird so lange fortgesetzt, bis wir alle Werte für diesen Schlüssel zusammengeführt haben. Wenn derselbe Schlüssel in mehreren Partitionen vorkommt, werden diese Werte schließlich zusammengeführt, um den endgültigen C zu erhalten.

Die Abbildungen 4-5 und 4-6 zeigen, wieaggregateByKey() mit verschiedenen Nullwerten funktioniert. Der Nullwert wird pro Schlüssel und pro Partition angewendet. Das heißt, wenn ein Schlüssel X sich in N Partitionen befindet, wird der Nullwert N mal angewendet (jede dieser N Partitionen wird mit dem Nullwert für den Schlüssel initialisiert X). Deshalb ist es wichtig, diesen Wert sorgfältig auszuwählen.

Abbildung 4-5 zeigt, wie aggregateByKey()mit zero-value=(0, 0) funktioniert.

daws 0405
Abbildung 4-5. aggregateByKey() mit zero-value=(0, 0)

Normalerweise würdest du (0, 0) verwenden, aber Abbildung 4-6 zeigt, wie die gleiche Transformation mit dem Nullwert (10, 20) funktioniert.

daws 0406
Abbildung 4-6. aggregateByKey() mit zero-value=(10, 20)

Erste Lösung mit aggregateByKey()

Um die durchschnittliche Bewertung für jeden Nutzer zu ermitteln, wird zunächst jeder Datensatz in (Schlüssel, Wert)-Paare der Form abgebildet:

(userID-as-key, rating-as-value)

Der einfachste Weg, die Werte pro Schlüssel zu addieren, ist die Transformation reduceByKey(). Wir können jedoch nicht reduceByKey()verwenden, um die durchschnittliche Bewertung pro Nutzer zu ermitteln, da, wie wir gesehen haben, die Funktion Mittelwert/Durchschnitt kein Monoid über eine Menge von Bewertungen (als Fließkommazahlen) ist. Um diese Operation zu einem Monoid zu machen, verwenden wir eine paarweise Datenstruktur (ein Tupel aus zwei Elementen), um ein Wertepaar (sum, count) zu speichern, wobei sum die aggregierte Summe der Bewertungen und countdie Anzahl der Bewertungen ist, die wir bisher hinzugefügt (summiert) haben, und wir verwenden die Transformation aggregateByKey().

Wir wollen beweisen, dass die Paarstruktur(sum, count) mit einem Additionsoperator über einer Menge von Zahlen ein Monoid ist.

Wenn wir (0.0, 0) als unser Nullelement verwenden, ist es neutral:

f(A, Zero) = A
f(Zero, A) = A

A = (sum, count)

f(A, Zero)
  = (sum+0.0, count+0)
  = (sum, count)
  = A

f(Zero, A)
  = (0.0+sum, 0+count)
  = (sum, count)
  = A

Die Operation ist kommutativ (d.h. das Ergebnis ist unabhängig von der Reihenfolge der Elemente im RDD, die aggregiert werden):

f(A, B) = f(B, A)

A = (sum1, count1)
B = (sum2, count2)

f(A, B)
  = (sum1+sum2, count1+count2)
  = (sum2+sum1, count2+count1)
  = f(B, A)

Sie ist außerdem assoziativ (die Reihenfolge, in der die Elemente zusammengefasst werden, hat keinen Einfluss auf das Endergebnis):

f(f(A, B), C) = f(A, f(B, C))

A = (sum1, count1)
B = (sum2, count2)
C = (sum3, count3)

f(f(A, B), C)
  = f((sum1+sum2, count1+count2), (sum3, count3))
  = (sum1+sum2+sum3, count1+count2+count3)
  = (sum1+(sum2+sum3), count1+(count2+count3))
  = f(A, f(B, C))

Der Einfachheit halber definieren wir eine sehr einfache Python-Funktion, create_pair(), die einen Datensatz mit Filmbewertungsdaten annimmt und ein Paar (userID, rating) zurückgibt:

# Define a function that accepts a CSV record
# and returns a pair of (userID, rating)
# Parameters: rating_record (as CSV String)
# rating_record = "userID,movieID,rating,timestamp"
def create_pair(rating_record):
	tokens = rating_record.split(",")
	userID = tokens[0]
	rating = float(tokens[2])
	return (userID, rating)
#end-def

Als Nächstes testen wir die Funktion:

key_value_1 = create_pair("3,2394,4.0,920586920")
print key_value_1
('3', 4.0)

key_value_2 = create_pair("1,169,2.5,1204927694")
print key_value_2
('1', 2.5)

Hier ist eine PySpark-Lösung, die aggregateByKey() und unsere create_pair() Funktion verwendet. Der kombinierte Typ (C), um Werte für dieaggregateByKey() Operation zu bezeichnen, ist ein Paar von (sum-of-ratings, count-of-ratings).

# spark: an instance of SparkSession
ratings_path = "/tmp/movielens/ratings.csv.no.header"
rdd = spark.sparkContext.textFile(ratings_path)
# load user-defined Python function
ratings = rdd.map(lambda rec : create_pair(rec)) 1
ratings.count()
#
# C = (C[0], C[1]) = (sum-of-ratings, count-of-ratings)
# zero_value -> C = (0.0, 0)
# seq_func: (C, V) -> C
# comb_func: (C, C) -> C
sum_count = ratings.aggregateByKey( 2
    (0.0, 0), 3
    (lambda C, V: (C[0]+V, C[1]+1)), 4
    (lambda C1, C2: (C1[0]+C2[0], C1[1]+C2[1])) 5
)
1

Das Quell-RDD, ratings, ist ein RDD[(String, Float)], wobei der Schlüssel ein userID und der Wert ein rating ist.

2

Das Ziel-RDD, sum_count, ist ein RDD[(String, (Float, Integer))], wobei der Schlüssel ein userID und der Wert ein Paar (sum-of-ratings, count-of-ratings) ist.

3

C wird in jeder Partition mit diesem Wert initialisiert.

4

Damit werden Werte innerhalb einer einzelnen Partition kombiniert.

5

Damit werden die Ergebnisse aus verschiedenen Partitionen kombiniert.

Schauen wir uns an, was hier passiert. Zunächst erstellen wir mit der Funktion aggregateByKey() eine Ergebnismenge "Template" mit den Anfangswerten. Wir beginnen mit den Daten von (0.0, 0), also ist die anfängliche Summe der Bewertungen 0.0 und die anfängliche Anzahl der Datensätze ist 0. Für jede Datenzeile fügen wir etwas hinzu. C ist die neue Vorlage, C[0] bezieht sich also auf das Element "Summe" (sum-of-ratings), während C[1] das Element "Anzahl" (count-of-ratings) ist. Zum Schluss kombinieren wir die Werte aus den verschiedenen Partitionen. Dazu fügen wir einfach die Werte von C1 zu den Werten von C2 hinzu, basierend auf der Vorlage, die wir erstellt haben.

Die Daten im sum_count RDD sehen dann wie folgt aus:

sum_count
  = [(userID, (sum-of-ratings, count-of-ratings)), ...]
  = RDD[(String, (Float, Integer))]

[
  (100, (40.0, 10)),
  (200, (51.0, 13)),
  (300, (340.0, 90)),
  ...
]

Dies sagt uns, dass der Benutzer 100 10 Filme bewertet hat und die Summe aller Bewertungen 40,0 war; der Benutzer 200 hat 13 Filme bewertet und die Summe aller Bewertungen war 51,0

Um nun die tatsächliche durchschnittliche Bewertung pro Nutzer zu erhalten, müssen wir die mapValues() Transformation verwenden und den ersten Eintrag (sum-of-ratings) durch den zweiten Eintrag (count-of-ratings) teilen:

# x =  (sum-of-ratings, count-of-ratings)
# x[0] = sum-of-ratings
# x[1] = count-of-ratings
# avg = sum-of-ratings / count-of-ratings
average_rating = sum_count.mapValues(lambda x: (x[0]/x[1])) 1
1

average_rating ist ein RDD[(String, Float)], bei dem der Schlüssel ein userID und der Wert ein average-rating ist.

Der Inhalt dieses RDDs sieht wie folgt aus und liefert uns das gewünschte Ergebnis:

average_rating
[
  (100, 4.00),
  (200, 3.92),
  (300, 3.77),
  ...
]

Zweite Lösung mit aggregateByKey()

Hier stelle ich eine weitere Lösung vor, die die aggregateByKey() Transformation verwendet. Beachte, dass ich die von der PySpark-Shell erzeugte Ausgabe aus Platzgründen gekürzt habe.

Der erste Schritt besteht darin, die Daten zu lesen und (Schlüssel, Wert)-Paare zu erstellen, wobei der Schlüssel ein userID und der Wert ein rating ist:

# ./bin/pyspark
SparkSession available as 'spark'.
>>># create_pair() returns a pair (userID, rating)
>>># rating_record = "userID,movieID,rating,timestamp"
>>> def create_pair(rating_record):
...     tokens = rating_record.split(",")
...     return (tokens[0], float(tokens[2]))
...
>>> key_value_test = create_pair("3,2394,4.0,920586920")
>>> print key_value_test
('3', 4.0)
>>> ratings_path = "/tmp/movielens/ratings.csv.no.header"
>>> rdd = spark.sparkContext.textFile(ratings_path)
>>> rdd.count()
22884377
>>> ratings = rdd.map(lambda rec : create_pair(rec))
>>> ratings.count()
22884377
>>> ratings.take(3)
[(u'1', 2.5), (u'1', 3.0), (u'1', 5.0)]

Sobald wir die (Schlüssel, Wert)-Paare erstellt haben, können wir die Transformation aggregateByKey() anwenden, um die Bewertungen zu summieren. Der Anfangswert (0.0, 0)wird für jede Partition verwendet, wobei 0.0 die Summe der Bewertungen und 0 die Anzahl der Bewertungen ist:

>>># C is a combined data structure, (sum, count)
>>> sum_count = ratings.aggregateByKey( 1
...     (0.0, 0), 2
...     (lambda C, V: (C[0]+V, C[1]+1)), 3
...     (lambda C1, C2: (C1[0]+C2[0], C1[1]+C2[1]))) 4

>>> sum_count.count()
247753

>>> sum_count.take(3)
[
 (u'145757', (148.0, 50)),
 (u'244330', (36.0, 17)),
 (u'180162', (1882.0, 489))
]
1

Das Ziel-RDD ist ein RDD[(String, (Float, Integer))].

2

C wird in jeder Partition mit (0.0, 0) initialisiert.

3

Dieser Lambda-Ausdruck fügt einen einzelnen Wert von Vzu C hinzu (verwendet in einer einzelnen Partition).

4

Dieser Lambda-Ausdruck kombiniert die Werte über die Partitionen hinweg (addiert zwei Cs, um eine einzige C zu erstellen).

Wir könnten Python-Funktionen anstelle von Lambda-Ausdrücken verwenden. Dazu müssten wir die folgenden Funktionen schreiben:

# C = (sum, count)
# V is a single value of type Float
def seq_func(C, V):
    return (C[0]+V, C[1]+1)
#end-def

# C1 = (sum1, count1)
# C2 = (sum2, count2)
def comb_func(C1, C2):
    return (C1[0]+C2[0], C1[1]+C2[1])
#end-def

Jetzt können wir sum_countmithilfe der definierten Funktionen berechnen:

sum_count = ratings.aggregateByKey(
    (0.0, 0),
    seq_func,
    comb_func
)

Im vorherigen Schritt wurden RDD-Elemente des folgenden Typs erstellt:

(userID, (sum-of-ratings, number-of-ratings))

Als Nächstes führen wir die letzte Berechnung durch, um die durchschnittliche Bewertung pro Nutzer zu ermitteln:

>>># x refers to a pair of (sum-of-ratings, number-of-ratings)
>>># where
>>>#      x[0] denotes sum-of-ratings
>>>#      x[1] denotes number-of-ratings
>>>
>>> average_rating = sum_count.mapValues(lambda x:(x[0]/x[1]))
>>> average_rating.count()
247753

>>> average_rating.take(3)
[
 (u'145757', 2.96),
 (u'244330', 2.1176470588235294),
 (u'180162', 3.8486707566462166)
]

Als Nächstes werde ich eine Lösung für das Filmproblem mit groupByKey() vorstellen.

Vollständige PySpark-Lösung mit groupByKey()

Für einen gegebenen Satz von (K, V) Paaren hatgroupByKey() die folgende Signatur:

groupByKey(numPartitions=None, partitionFunc=<function portable_hash>)
groupByKey : RDD[(K, V)] --> RDD[(K, [V])]

Wenn das Quell-RDD ein RDD[(K, V)] ist, gruppiert die groupByKey() Transformation die Werte für jeden Schlüssel (K) im RDD in eine einzige Sequenz als Liste/Aufzählung von Vs. Anschließend wird das resultierende RDD mit dem vorhandenen Partitionierer/Parallelitätslevel in Hash-Werte aufgeteilt. Die Reihenfolge der Elemente innerhalb jeder Gruppe ist nicht garantiert und kann sich sogar bei jeder Auswertung des RDDs ändern.

Tipp

Du kannst sowohl die Anzahl der Partitionen (numPartitions) als auch die Partitionierungsfunktion (partitionFunc) anpassen.

Hier stelle ich eine vollständige Lösung vor, die die groupByKey() Transformation verwendet.

Der erste Schritt besteht darin, die Daten zu lesen und (Schlüssel, Wert)-Paare zu erstellen, wobei der Schlüssel ein userID und der Wert ein rating ist:

>>># spark: SparkSession
>>> def create_pair(rating_record):
...     tokens = rating_record.split(",")
...     return (tokens[0], float(tokens[2]))
...
>>> key_value_test = create_pair("3,2394,4.0,920586920")
>>> print key_value_test
('3', 4.0)

>>> ratings_path = "/tmp/movielens/ratings.csv.no.header"
>>> rdd = spark.sparkContext.textFile(ratings_path)
>>> rdd.count()
22884377
>>> ratings = rdd.map(lambda rec : create_pair(rec)) 1
>>> ratings.count()
22884377
>>> ratings.take(3)
[
 (u'1', 2.5),
 (u'1', 3.0),
 (u'1', 5.0)
]
1

ratings ist eineRDD[(String, Float)]

Sobald wir die (Schlüssel, Wert)-Paare erstellt haben, können wir die Transformation groupByKey() anwenden, um alle Bewertungen für einen Nutzer zu gruppieren. Dieser Schritt erzeugt (userID, [R1, ..., Rn]) Paare, wobei R1, ..., Rn alle Bewertungen für ein bestimmtes userID sind.

Wie du feststellen wirst, funktioniert die groupByKey()Transformation genau wie die GROUP BY von SQL. Sie gruppiert die Werte desselben Schlüssels als eine Iterable von Werten:

>>> ratings_grouped = ratings.groupByKey() 1
>>> ratings_grouped.count()
247753
>>> ratings_grouped.take(3)
[
 (u'145757', <ResultIterable object at 0x111e42e50>), 2
 (u'244330', <ResultIterable object at 0x111e42dd0>),
 (u'180162', <ResultIterable object at 0x111e42e10>)
]
>>> ratings_grouped.mapValues(lambda x: list(x)).take(3) 3
[
 (u'145757', [2.0, 3.5, ..., 3.5, 1.0]),
 (u'244330', [3.5, 1.5, ..., 4.0, 2.0]),
 (u'180162', [5.0, 4.0, ..., 4.0, 5.0])
]
1

ratings_grouped ist ein RDD[(String, [Float])], bei dem der Schlüssel ein userID und der Wert eine Liste von ratingist.

2

Der vollständige Name von ResultIterable ist pyspark.resultiterable.ResultIterable.

3

Für die Fehlersuche konvertiere das ResultIterableObjekt in eine Liste von Integers.

Um die durchschnittliche Bewertung pro Nutzer zu ermitteln, summieren wir alle Bewertungen für jedes userID und berechnen dann die Durchschnittswerte:

>>># x refers to all ratings for a user as [R1, ..., Rn]
>>># x: ResultIterable object
>>> average_rating = ratings_grouped.mapValues(lambda x: sum(x)/len(x)) 1
>>> average_rating.count()
247753
>>> average_rating.take(3)
[
 (u'145757', 2.96),
 (u'244330', 2.12),
 (u'180162', 3.85)
]
1

average_rating ist eine RDD[(String, Float)], bei der der Schlüssel userID und der Wert average-rating ist.

Vollständige PySpark-Lösung mit reduceByKey()

In seiner einfachsten Form hat reduceByKey() die folgende Signatur (die Quell- und Zieldatentypen, V, müssen gleich sein):

reduceByKey(func, numPartitions=None, partitionFunc)
reduceByKey: RDD[(K, V)] --> RDD[(K, V)]

reduceByKey() Transformation führt die Werte für jeden Schlüssel mithilfe einer assoziativen und kommutativen Reduktionsfunktion zusammen. Auch hier wird die Zusammenführung lokal aufjedem Mapper durchgeführt, bevor die Ergebnisse an einen Reducer gesendet werden, ähnlich wie bei einem Combiner inMapReduce. Die Ausgabe wird mit numPartitionspartitioniert, oder mit dem Standard-Parallelitätslevel, wenn numPartitions nicht angegeben ist. Der Standard-Partitionierer ist HashPartitioner.

Da wir die durchschnittliche Bewertung für alle von einem Nutzer bewerteten Filme ermitteln wollen und wir wissen, dass der Mittelwert der Mittelwerte kein Mittelwert ist (die Mittelwertfunktion ist kein Monoid), müssen wir alle Bewertungen für jeden Nutzer addieren und die Anzahl der von ihm bewerteten Filme verfolgen. Dann ist (sum_of_ratings, number_of_ratings) ein Monoid über einer Additionsfunktion, aber am Ende müssen wir noch eine mapValues()Transformation durchführen, um die tatsächliche Durchschnittsbewertung zu finden, indem wir sum_of_ratings durch number_of_ratings dividieren. Die vollständige Lösung mit reduceByKey() finden Sie hier. Beachten Sie, dass reduceByKey() effizienter und skalierbarer ist als die Transformation groupByKey(), da das Zusammenführen und Kombinieren lokal erfolgt, bevor die Daten für die endgültige Reduzierung gesendet werden.

Schritt 1: Daten lesen und Paare bilden

Der erste Schritt besteht darin, die Daten zu lesen und (Schlüssel, Wert)-Paare zu erstellen, wobei der Schlüssel ein userID und der Wert ein Paar von (rating, 1) ist. Um reduceByKey()für die Suche nach Durchschnittswerten zu verwenden, müssen wir die(sum_of_ratings, number_of_ratings) finden. Wir beginnen damit, die Eingabedaten zu lesen und ein RDD[String] zu erstellen:

>>># spark: SparkSession
>>> ratings_path = "/tmp/movielens/ratings.csv.no.header"
>>># rdd: RDD[String]
>>> rdd = spark.sparkContext.textFile(ratings_path)
>>> rdd.take(3)
[
 u'1,169,2.5,1204927694',
 u'1,2471,3.0,1204927438',
 u'1,48516,5.0,1204927435'
]

Dann verwandeln wir die RDD[String] in eine RDD[(String, (Float, Integer))]:

>>> def create_combined_pair(rating_record):
...     tokens = rating_record.split(",")
...     userID = tokens[0]
...     rating = float(tokens[2])
...     return (userID, (rating, 1))
...
>>># ratings: RDD[(String, (Float, Integer))]
>>> ratings = rdd.map(lambda rec : create_combined_pair(rec)) 1
>>> ratings.count()
22884377
>>> ratings.take(3)
[
 (u'1', (2.5, 1)),
 (u'1', (3.0, 1)),
 (u'1', (5.0, 1))
]
1

Erstelle das Paar RDD.

Schritt 2: Verwende reduceByKey(), um die Bewertungen zu summieren

Sobald wir die (userID, (rating, 1)) Paare erstellt haben, können wir die reduceByKey() Transformation anwenden, um alle Bewertungen und die Anzahl der Bewertungen für einen bestimmten Nutzer zusammenzufassen. Das Ergebnis dieses Schritts sind Tupel von(userID, (sum_of_ratings,number_of_ratings)):

>>># x refers to (rating1, frequency1)
>>># y refers to (rating2, frequency2)
>>># x = (x[0] = rating1, x[1] = frequency1)
>>># y = (y[0] = rating2, y[1] = frequency2)
>>># x + y = (rating1+rating2, frequency1+frequency2)
>>># ratings is the source RDD 1
>>> sum_and_count = ratings.reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1])) 2
>>> sum_and_count.count()
247753
>>> sum_and_count.take(3)
[
 (u'145757', (148.0, 50)),
 (u'244330', (36.0, 17)),
 (u'180162', (1882.0, 489))
]
1

Das Quell-RDD (ratings) ist einRDD[(String, (Float, Integer))].

2

Das Ziel-RDD (sum_and_count) ist einRDD[(String, (Float, Integer))]. Beachte, dass die Datentypen für die Quelle und das Ziel identisch sind.

Schritt 3: Finde die durchschnittliche Bewertung

Teile sum_of_ratings durch number_of_ratings, um die durchschnittliche Bewertung pro Nutzer zu ermitteln:

>>># x refers to (sum_of_ratings, number_of_ratings)
>>># x = (x[0] = sum_of_ratings, x[1] = number_of_ratings)
>>># avg = sum_of_ratings / number_of_ratings = x[0] / x[1]
>>> avgRating = sum_and_count.mapValues(lambda x : x[0] / x[1])
>>> avgRating.take(3)
[
 (u'145757', 2.96),
 (u'244330', 2.1176470588235294),
 (u'180162', 3.8486707566462166)
]

Komplette PySpark-Lösung mit combineByKey()

combineByKey() ist eine allgemeinere und erweiterte Version von reduceByKey(), bei der der Ergebnistyp anders sein kann als der Typ der zu aggregierenden Werte. Dies ist eine Einschränkung von reduceByKey(); es bedeutet, dass bei den folgenden

# let rdd represent (key, value) pairs
# where value is of type T
rdd2 = rdd.reduceByKey(lambda x, y: func(x,y))

func(x,y) muss einen Wert vom Typ T erzeugen.

Die combineByKey() Transformation ist eine Optimierung, bei der die Werte für einen bestimmten Schlüssel aggregiert werden, bevor die aggregierten Partitionswerte an den vorgesehenen Reduzierer gesendet werden. Diese Aggregation wird in jeder Partition durchgeführt und dann werden die Werte aus allen Partitionen zu einem einzigen Wert zusammengeführt. Wie bei reduceByKey() gibt jede Partition also höchstens einen Wert für jeden Schlüssel aus, der über das Netzwerk gesendet wird, was den Shuffle-Schritt beschleunigt. Anders als bei reduceByKey() muss der Typ des kombinierten (Ergebnis-)Wertes jedoch nicht mit dem Typ des ursprünglichen Wertes übereinstimmen.

Für eine gegebene Menge von (K, V) Paaren hat combineByKey()die folgende Signatur (diese Transformation hat viele verschiedene Versionen; dies ist die einfachste Form):

combineByKey(create_combiner, merge_value, merge_combiners)
combineByKey : RDD[(K, V)] --> RDD[(K, C)]

V and C can be different data types.

Dies ist eine allgemeine Funktion, mit der die Elemente für jeden Schlüssel unter Verwendung eines benutzerdefinierten Satzes von Aggregationsfunktionen kombiniert werden. Sie wandelt ein RDD[(K, V)] in ein Ergebnis des TypsRDD[(K, C)] um, wobei C ein kombinierter Typ ist. Das kann ein einfacher Datentyp wie Integer oder String sein, aber auch eine zusammengesetzte Datenstruktur wie ein (Schlüssel, Wert)-Paar, ein Triplett (x, y, z) oder was immer du willst. Diese Flexibilität macht combineByKey()zu einem sehr leistungsfähigen Reducer.

Wie bereits in diesem Kapitel beschrieben, müssen wir bei einem Quell-RDD RDD[(K, V)] drei grundlegende Funktionen bereitstellen:

create_combiner: (V) -> C
merge_value: (C, V) -> C
merge_combiners: (C, C) -> C

Um die Speicherzuweisung zu vermeiden, dürfen sowohl merge_valueals auch merge_combiners ihr erstes Argument ändern und zurückgeben, anstatt ein neues C zu erstellen (dadurch wird vermieden, dass neue Objekte erstellt werden, was bei einer großen Datenmenge sehr kostspielig sein kann).

Außerdem können die Nutzer (durch Angabe zusätzlicher Parameter) die Partitionierung des Ausgabe-RDD, den Serialisierer, der für das Shuffle verwendet wird, und die Durchführung einer Map-seitigen Aggregation (d. h., ob ein Mapper mehrere Elemente mit demselben Schlüssel erzeugen kann) steuern. Die Transformation combineByKey()bietet also viel Flexibilität, ist aber etwas komplexer in der Anwendung als andere Reduktionstransformationen.

Schauen wir uns an, wie wir combineByKey() nutzen können, um das Filmproblem zu lösen.

Schritt 1: Daten lesen und Paare bilden

Wie bei den vorherigen Lösungen besteht der erste Schritt darin, die Daten zu lesen und (Schlüssel, Wert)-Paare zu erstellen, wobei der Schlüssel ein userID und der Wert ein rating ist:

>>># spark: SparkSession
>>># create and return a pair of (userID, rating)
>>> def create_pair(rating_record):
...     tokens = rating_record.split(",")
...     return (tokens[0], float(tokens[2]))
...
>>> key_value_test = create_pair("3,2394,4.0,920586920")
>>> print key_value_test
('3', 4.0)

>>> ratings_path = "/tmp/movielens/ratings.csv.no.header"
>>> rdd = spark.sparkContext.textFile(ratings_path) 1
>>> rdd.count()
22884377
>>> ratings = rdd.map(lambda rec : create_pair(rec)) 2
>>> ratings.count()
22884377
>>> ratings.take(3)
[
 (u'1', 2.5),
 (u'1', 3.0),
 (u'1', 5.0)
]
1

rdd ist eine RDD[String].

2

ratings ist eine RDD[(String, Float)].

Schritt 2: Verwende combineByKey(), um die Bewertungen zusammenzufassen

Sobald wir die (userID, rating) Paare erstellt haben, können wir die combineByKey()Transformation anwenden, um alle Bewertungen und die Anzahl der Bewertungen für jeden Nutzer zusammenzufassen. Das Ergebnis dieses Schrittes sind (userID, (sum_of_ratings, number_of_ratings)) Paare:

>>># v is a rating from (userID, rating)
>>># C represents (sum_of_ratings, number_of_ratings)
>>># C[0] denotes sum_of_ratings
>>># C[1] denotes number_of_ratings
>>># ratings: source RDD  1
>>> sum_count = ratings.combineByKey( 2
          (lambda v: (v, 1)), 3
          (lambda C,v: (C[0]+v, C[1]+1)), 4
          (lambda C1,C2: (C1[0]+C2[0], C1[1]+C2[1])) 5
    )
>>> sum_count.count()
247753
>>> sum_count.take(3)
[
 (u'145757', (148.0, 50)),
 (u'244330', (36.0, 17)),
 (u'180162', (1882.0, 489))
]
1

Das Quell-RDD ist ein RDD[(String, Float)].

2

Das Ziel-RDD ist ein RDD[(String, (Float, Integer))].

3

So wird aus einem V (einem einzelnen Wert) ein C als (V, 1).

4

Damit wird eine V (Bewertung) in eine Cals (sum, count) zusammengeführt.

5

Damit werden zwei Czu einem einzigen C kombiniert.

Schritt 3: Finde die durchschnittliche Bewertung

Teile sum_of_ratings durch number_of_ratings, um die durchschnittliche Bewertung pro Nutzer zu ermitteln:

>>># x = (sum_of_ratings, number_of_ratings)
>>># x[0] = sum_of_ratings
>>># x[1] = number_of_ratings
>>># avg = sum_of_ratings / number_of_ratings
>>> average_rating = sum_count.mapValues(lambda x:(x[0] / x[1]))
>>> average_rating.take(3)
[
 (u'145757', 2.96),
 (u'244330', 2.1176470588235294),
 (u'180162', 3.8486707566462166)
]

Als Nächstes untersuchen wir den Shuffle-Schritt bei den Reduktionstransformationen von Spark.

Der Shuffle-Schritt bei Ermäßigungen

Sobald alle Mapper die Ausgabe von (Schlüssel-, Wert-)Paaren beendet haben, beginnt die Magie von MapReduce: der Sortier- und Shuffle-Schritt. In diesem Schritt wird die Ausgabe der Map-Phase nach Schlüsseln gruppiert (sortiert) und die Ergebnisse an den/die Reducer gesendet. Unter dem Gesichtspunkt der Effizienz und Skalierbarkeit ist dies für verschiedene Transformationen unterschiedlich.

Die Idee des Sortierens nach Schlüsseln sollte inzwischen bekannt sein, deshalb konzentriere ich mich hier auf das Mischen. Kurz gesagt ist das Shuffling der Prozess der Neuverteilung von Daten zwischen den Partitionen. Dabei können die Daten zwischen JVM-Prozessen oder sogar über das Internet (zwischen Executors auf verschiedenen Servern) verschoben werden, müssen es aber nicht.

Ich erkläre dir das Konzept des Shufflings anhand eines Beispiels. Stell dir vor, du hast einen Spark-Cluster mit 100 Knoten. Jeder Knoten hat Datensätze mit der Häufigkeit der URL-Besuche und du möchtest die Gesamthäufigkeit pro URL berechnen. Wie du inzwischen weißt, kannst du das erreichen, indem du die Daten liest und (Schlüssel, Wert)-Paare erstellst, wobei der Schlüssel ein URL und der Wert ein frequency ist, und dann die Häufigkeiten für jede URL zusammenzählst. Aber wenn die Daten über den Cluster verteilt sind, wie kannst du dann die Werte für denselben Schlüssel auf verschiedenen Servern zusammenrechnen? Die einzige Möglichkeit besteht darin, alle Werte für denselben Schlüssel auf denselben Server zu bringen; dann kannst du sie leicht zusammenzählen. Dieser Prozess wird Shuffling genannt.

Es gibt viele Transformationen (z. B.reduceByKey() und join()), bei denen die Daten im Cluster gemischt werden müssen, aber das kann sehr teuer sein. Das Mischen der Daten für groupByKey() unterscheidet sich vom Mischen der Daten für reduceByKey(), und dieser Unterschied wirkt sich auf die Leistung der einzelnen Transformationen aus. Daher ist es sehr wichtig, die richtigen Reduktionstransformationen auszuwählen und anzuwenden.

Betrachte die folgende PySpark-Lösung für ein einfaches Wortzählproblem:

# spark: SparkSession
# We use 5 partitions for textFile(), flatMap(), and map()
# We use 3 partitions for the reduceByKey() reduction
rdd = spark.sparkContext.textFile("input.txt", 5)\
   .flatMap(lambda line: line.split(" "))\
   .map(lambda word: (word, 1))\
   .reduceByKey(lambda a, b: a + b, 3)\ 1
   .collect()
1

3 ist die Anzahl der Partitionen.

Da wir die reduceByKey()Transformation angewiesen haben, drei Partitionen zu erstellen, wird der resultierende RDD in drei Chunks unterteilt, wie in Abbildung 4-7 dargestellt. Die RDD-Operationen werden zu einem gerichteten azyklischen Graphen von RDD-Objekten zusammengefasst, wobei jedes RDD einen Zeiger auf die übergeordneten Objekte enthält, von denen es abhängt. Wie diese Abbildung zeigt, wird der DAG an den Shuffle-Grenzen inPhasen (Phase 1, Phase 2 usw.) unterteilt, die der Reihe nach ausgeführt werden.

daws 0407
Abbildung 4-7. Das Shuffle-Konzept von Spark

Da beim Shuffling Daten über Executors und Server hinweg kopiert werden müssen, ist dies ein komplexer und kostspieliger Vorgang. Schauen wir uns genauer an, wie das bei zwei Spark-Reduktionstransformationen funktioniert: groupByKey() und reduceByKey(). So wird deutlich, wie wichtig es ist, die richtige Reduktion zu wählen.

Shuffle-Schritt für groupByKey()

Der Schritt groupByKey() shuffle ist ziemlich einfach. Hier werden die Werte für die einzelnen Schlüssel nicht zusammengeführt, sondern direkt gemischt. Das bedeutet, dass eine große Menge an Daten an jede Partition gesendet wird, da die anfänglichen Datenwerte nicht reduziert werden. Die Zusammenführung der Werte für die einzelnen Schlüssel erfolgt nach dem Shuffle-Schritt. BeigroupByKey() müssen viele Daten auf den abschließenden Worker Nodes (Reducer) gespeichert werden, was bedeutet, dass es zu OOM-Fehlern kommen kann, wenn es viele Daten pro Schlüssel gibt.Abbildung 4-8 veranschaulicht den Prozess. Beachte, dass du nach groupByKey() mapValues() aufrufen musst, um die gewünschte Endausgabe zu erzeugen.

daws 0408
Abbildung 4-8. Shuffle-Schritt für groupByKey()

Da groupByKey() keine Werte zusammenführt oder kombiniert, ist dies ein teurer Vorgang, der die Übertragung großer Datenmengen über das Netzwerk erfordert.

Shuffle-Schritt für reduceByKey()

Mit reduceByKey() werden die Daten in jeder Partition so kombiniert, dass es für jeden Schlüssel in jeder Partition höchstens einen Wert gibt. Dann werden die Daten gemischt und über das Netzwerk an die Reduzierer gesendet, wie in Abbildung 4-9 dargestellt. Beachte, dass du mitreduceByKey() nicht mapValues() aufrufen musst, um die gewünschte Endausgabe zu erzeugen. Im Allgemeinen ist es gleichwertig mit der Verwendung von groupByKey() und mapValues(), aber aufgrund der geringeren Datenmenge, die über das Netzwerk gesendet wird, ist es eine viel effizientere und leistungsfähigere Lösung.

daws 0409
Abbildung 4-9. Shuffle-Schritt für reduceByKey()

Zusammenfassung

In diesem Kapitel wurden die Reduktionstransformationen von Spark vorgestellt und mehrere Lösungen für ein reales Datenproblem mit den am häufigsten verwendeten dieser Transformationen präsentiert: reduceByKey(),aggregateByKey(), combineByKey() und groupByKey(). Wie du gesehen hast, gibt es viele Wege, um ein und dasselbe Datenproblem zu lösen, aber sie haben nicht alle die gleiche Leistung.

Tabelle 4-2 fasst die Arten von Transformationen zusammen, die von diesen vier Reduktionstransformationen durchgeführt werden (beachte, dass Vund C unterschiedliche Datentypen sein können).

Tabelle 4-2. Vergleich der Funkenreduzierung
Reduktion Quelle RDD Ziel-RDD

reduceByKey()

RDD[(K, V)]

RDD[(K, V)]

groupByKey()

RDD[(K, V)]

RDD[(K, [V])]

aggregateByKey()

RDD[(K, V)]

RDD[(K, C)]

combineByKey()

RDD[(K, V)]

RDD[(K, C)]

Wir haben gelernt, dass einige der Reduktionstransformationen (z.B. reduceByKey() und combineByKey()) groupByKey() vorzuziehen sind, da der Shuffle-Schritt für groupByKey() teurer ist. Wenn möglich, solltest du reduceByKey() anstelle von groupByKey() verwenden oder combineByKey(), wenn du Elemente kombinierst, aber der Rückgabetyp vom Typ des Eingabewerts abweicht. Insgesamt sind reduceByKey() undcombineByKey() bei großen Datenmengen leistungsfähiger und besser skalierbar als groupByKey().

Die Transformation aggregateByKey()eignet sich besser für Aggregationen nach Schlüssel, die Berechnungen erfordern, wie z. B. die Ermittlung von Summe, Durchschnitt, Varianz usw. Wichtig dabei ist, dass die zusätzlichen Berechnungen, die für die Kombination auf der Map-Seite anfallen, die Datenmenge reduzieren können, die an andere Worker Nodes und den Treiber gesendet wird.

Im nächsten Kapitel werden wir uns mit der Partitionierung von Daten befassen.

Get Datenalgorithmen mit Spark now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.