Kapitel 1. Einführung in Spark und PySpark
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
Spark ist eine leistungsstarke Analyse-Engine für die Verarbeitung großer Datenmengen, die auf Geschwindigkeit, Benutzerfreundlichkeit und Erweiterbarkeit für Big-Data-Anwendungen ausgerichtet ist. Es ist eine bewährte und weit verbreitete Technologie, die von vielen Unternehmen genutzt wird, die tagtäglich mit großen Daten arbeiten. Obwohl die "Muttersprache" von Spark Scala ist (der größte Teil von Spark wird in Scala entwickelt), bietet es auch High-Level-APIs in Java, Python und R.
In diesem Buch werden wir Python über PySpark verwenden, eine API, die das Spark-Programmiermodell für Python verfügbar macht. Da Python die am leichtesten zugängliche Programmiersprache ist und Spark über eine leistungsstarke und ausdrucksstarke API verfügt, ist PySpark aufgrund seiner Einfachheit die beste Wahl für uns. PySpark ist eine Schnittstelle für Spark in der Programmiersprache Python, die die folgenden zwei wichtigen Funktionen bietet:
-
Es ermöglicht uns, Spark-Anwendungen mit Python-APIs zu schreiben.
-
Es bietet die PySpark-Shell für die interaktive Analyse von Daten in einer verteiltenUmgebung.
Ziel dieses Kapitels ist es, PySpark als Hauptkomponente des Spark-Ökosystems vorzustellen und dir zu zeigen, dass es effektiv für Big-Data-Aufgaben wie ETL-Operationen, Indizierung von Milliarden von Dokumenten, Ingesting von Millionen von Genomen, maschinelles Lernen, Graphdatenanalyse, DNA-Datenanalyse und vieles mehr eingesetzt werden kann. Zunächst werde ich die Architekturen von Spark und PySpark vorstellen und anhand von Beispielen die Ausdrucksstärke von PySpark zeigen. Ich werde einen Überblick über die Kernfunktionen (Transformationen und Aktionen) und Konzepte von Spark geben, damit du in der Lage bist, Spark und PySpark sofort zu nutzen. Die wichtigsten Datenabstraktionen von Spark sind belastbare verteilte Datensätze (RDDs), Datenrahmen und Datensätze. Wie du sehen wirst, kannst du deine Daten (die als Hadoop-Dateien, Amazon S3-Objekte, Linux-Dateien, Datenstrukturen von Sammlungen, relationale Datenbanktabellen und mehr gespeichert sind) in beliebigen Kombinationen von RDDs undDataFrames darstellen.
Sobald deine Daten als Spark-Datenabstraktion dargestellt sind, kannst du Transformationen darauf anwenden und neue Datenabstraktionen erstellen, bis die Daten die endgültige Form haben, nach der du suchst. Mit den Spark-Transformationen (wie map()
und reduceByKey()
) kannst du deine Daten von einer Form in eine andere umwandeln, bis du das gewünschte Ergebnis erhältst. Ich werde diese Datenabstraktionen in Kürze erläutern, aber zuerst wollen wir ein wenig näher darauf eingehen, warum Spark die beste Wahl für die Datenanalyse ist.
Warum Spark für Datenanalyse
Spark ist eine leistungsstarke Analyse-Engine, die für die Verarbeitung großer Datenmengen verwendet werden kann. Die wichtigsten Gründe für den Einsatz von Spark sind:
-
Spark ist einfach, leistungsstark und schnell.
-
Spark ist kostenlos und quelloffen.
-
Spark läuft überall (Hadoop, Mesos, Kubernetes, eigenständig oder in der Cloud).
-
Spark kann Daten von/zu jeder Datenquelle lesen/schreiben (Amazon S3, Hadoop HDFS, relationale Datenbanken usw.).
-
Spark kann in fast jede Datenanwendung integriert werden.
-
Spark kann Daten in zeilenbasierten (wie Avro) und spaltenbasierten (wie Parquet und ORC) Formaten lesen/schreiben.
-
Spark verfügt über eine Reihe von einfachen APIs für alle Arten von ETL-Prozessen.
In den letzten fünf Jahren hat sich Spark so weiterentwickelt, dass ich glaube, dass es zur Lösung jedes Big-Data-Problems eingesetzt werden kann. Dafür spricht auch die Tatsache, dass alle Big-Data-Unternehmen wie Facebook, Illumina, IBM und Google Spark tagtäglich in Produktionssystemen einsetzen.
Spark ist eine der besten Entscheidungen für die Verarbeitung großer Datenmengen und für die Lösung vonMapReduce-Problemenund darüber hinaus, denn es erschließt die Macht der Daten, indem es große Daten mit leistungsstarken APIs und Geschwindigkeit verarbeitet. Die Verwendung von MapReduce/Hadoop zur Lösung von Big-Data-Problemen ist komplex, und du musst eine Menge Low-Level-Code schreiben, um selbst primitive Probleme zu lösen - hier kommt die Leistung und Einfachheit von Spark ins Spiel.Apache Sparkist wesentlich schneller als ApacheHadoop, da es In-Memory-Caching und optimierteAusführung für eine schnelle Leistung nutzt und allgemeine Batch-Verarbeitung, Streaming-Analysen, maschinelles Lernen, Graphenalgorithmen undSQL-Abfragen unterstützt.
Für PySpark gibt es in Spark zwei grundlegende Datenabstraktionen: RDD und DataFrame. Ich zeige dir, wie du deine Daten einliest und als RDD (eine Menge von Elementen desselben Typs) oder DataFrame (eine Tabelle mit Zeilen und benannten Spalten) darstellst; so kannst du einer verteilten Datensammlung eine Struktur auferlegen, die eine Abstraktion auf höherer Ebene ermöglicht. Sobald deine Daten als RDD oder DataFrame dargestellt sind, kannst du Transformationsfunktionen (wie Mapper, Filter und Reducer) darauf anwenden, um deine Daten in die gewünschte Form zu bringen. Ich werde dir viele Spark-Transformationen vorstellen, die du für ETL-Prozesse, Analysen und datenintensive Berechnungen nutzen kannst.
Einige einfache RDD-Transformationen sind in Abbildung 1-1 dargestellt.
Diese Abbildung zeigt die folgenden Transformationen:
-
Zuerst lesen wir unsere Eingabedaten (dargestellt als Textdatei sample.txt - hierzeige ich nur die ersten beiden Zeilen/Datensätze der Eingabedaten) mit einer Instanz von
SparkSession
, dem Einstiegspunkt in die Programmierung von Spark. Die InstanzSparkSession
wird alsspark
Objekt dargestellt. Beim Lesen der Eingabedaten wird ein neues RDD alsRDD[String]
erstellt: Jeder Eingabedatensatz wird in ein RDD-Element des TypsString
umgewandelt (wenn dein Eingabepfad ausN
Datensätze hat, dann ist die Anzahl der RDD-ElementeN
). Dies wird durch den folgenden Code erreicht:# Create an instance of SparkSession
spark
=
SparkSession
.
builder
.
getOrCreate
()
# Create an RDD[String], which represents all input
# records; each record becomes an RDD element
records
=
spark
.
sparkContext
.
textFile
(
"sample.txt"
)
-
Als Nächstes wandeln wir alle Zeichen in Kleinbuchstaben um. Dies geschieht durch die
map()
Transformation, die eine 1-zu-1-Transformation ist:# Convert each element of the RDD to lowercase
# x denotes a single element of the RDD
# records: source RDD[String]
# records_lowercase: target RDD[String]
records_lowercase
=
records
.
map
(
lambda
x
:
x
.
lower
())
-
Dann verwenden wir eine
flatMap()
Transformation, die eine 1-zu-viele-Transformation ist, um jedes Element (das einen einzelnen Datensatz darstellt) in eine Folge von Zielelementen (die jeweils ein Wort darstellen) umzuwandeln.DieflatMap()
Transformation gibt ein neues RDD zurück, indem sie zuerst eine Funktion (hiersplit(",")
) auf alle Elemente des Quell-RDD anwendet und dann die Ergebnisse abflacht:# Split each record into a list of words
# records_lowercase: source RDD[String]
# words: target RDD[String]
words
=
records_lowercase
.
flatMap
(
lambda
x
:
x
.
split
(
","
))
-
Schließlich lassen wir Wortelemente mit einer Länge kleiner oder gleich 2 weg. Die folgende
filter()
Transformation lässt unerwünschte Wörter weg und behält nur die mit einer Länge größer als 2:# Keep words with a length greater than 2
# x denotes a word
# words: source RDD[String]
# filtered: target RDD[String]
filtered
=
words
.
filter
(
lambda
x
:
len
(
x
)
>
2
)
Wie du sehen kannst, sind Spark-Transformationen auf hohem Niveau, leistungsstark und einfach. Spark ist von Natur aus verteilt und parallel: Deine Eingabedaten sind partitioniert und können von Transformationen (wie Mappern, Filtern und Reduktoren) parallel in einer Cluster-Umgebung verarbeitet werden. Kurz gesagt: Um ein Datenanalyseproblem in PySpark zu lösen, liest du Daten und stellst sie als RDD oder DataFrame dar (je nach Art des Datenformats). Dann schreibst du eine Reihe von Transformationen, um deine Daten in die gewünschte Ausgabe umzuwandeln. Spark partitioniert deine DataFrames und RDDs automatisch und verteilt die Partitionen auf verschiedene Clusterknoten. Partitionen sind die Grundeinheiten der Parallelität in Spark. Die Parallelität ermöglicht es Entwicklern, Aufgaben auf Hunderten von Computerservern in einem Cluster parallel und unabhängig voneinander auszuführen. Eine Partition in Spark ist ein Chunk (eine logische Unterteilung) von Daten, die auf einem Knoten im Cluster gespeichert sind. Datenrahmen (DataFrames) und RDDs sind Sammlungen von Partitionen. Spark verfügt über einen Standard-Datenpartitionierer für RDDs und DataFrames, aber du kannst diese Partitionierung mit deiner eigenenProgrammierung außer Kraft setzen.
Als Nächstes wollen wir ein wenig tiefer in das Ökosystem und die Architektur von Spark eintauchen.
Das Spark-Ökosystem
Das Ökosystem von Spark ist inAbbildung 1-2 dargestellt. Es besteht aus drei Hauptkomponenten:
- Umgebungen
-
Spark kann überall laufen und lässt sich gut in andere Umgebungen integrieren.
- Anwendungen
-
Spark lässt sich gut mit einer Vielzahl von Big-Data-Plattformen und -Anwendungen integrieren.
- Datenquellen
-
Spark kann Daten von und in viele Datenquellen lesen und schreiben.
Das umfangreiche Ökosystem von Spark macht PySpark zu einem großartigen Werkzeug für ETL, Datenanalyse und viele andere Aufgaben. Mit PySpark kannst du Daten aus vielen verschiedenen Datenquellen (dem Linux-Dateisystem, Amazon S3, dem Hadoop Distributed File System, relationalen Tabellen, MongoDB, Elasticsearch, Parquet-Dateien usw.) einlesen und als Spark-Datenabstraktion, wie z. B. RDDs oder DataFrames, darstellen. Sobald deine Daten in dieser Form vorliegen, kannst du eine Reihe von einfachen und leistungsstarken Spark-Transformationen verwenden, um die Daten in die gewünschte Form und das gewünschte Format zu bringen. Du kannst zum Beispiel die Transformation filter()
verwenden, um unerwünschte Datensätze zu entfernen, groupByKey()
, um deine Daten nach dem gewünschten Schlüssel zu gruppieren, und schließlich die Transformation mapValues()
, um die gruppierten Daten abschließend zu aggregieren (z. B. um den Durchschnitt, den Median und die Standardabweichung von Zahlen zu ermitteln). Alle diese Transformationen sind mit der einfachen, aber leistungsstarken PySpark-API möglich.
Spark Architektur
Wenn du kleine Datenmengen hast, ist es möglich, sie mit einem einzigen Computer in einer angemessenen Zeit zu analysieren. Wenn du große Datenmengen hast, kann die Analyse und Verarbeitung (und Speicherung) dieser Daten mit einem einzigen Computer unerschwinglich langsam oder sogar unmöglich sein. Aus diesem Grund wollen wir Spark verwenden.
Spark verfügt über eine Kernbibliothek und eine Reihe integrierter Bibliotheken (SQL, GraphX, Streaming, MLlib), wie in Abbildung 1-3 dargestellt. Wie du siehst, kann Spark über seine DataSource-API mit vielen Datenquellen interagieren, z. B. mit Hadoop, HBase, Amazon S3, Elasticsearch und MySQL, um nur einige zu nennen.
Diese Abbildung zeigt die wahre Stärke von Spark: Du kannst mehrere verschiedene Sprachen verwenden, um deine Spark-Anwendungen zu schreiben, und dann umfangreiche Bibliotheken nutzen, um verschiedene Big-Data-Probleme zu lösen. Gleichzeitig kannst du Daten aus einer Vielzahl von Datenquellen lesen und schreiben.
Schlüsselbegriffe
Um die Architektur von Spark zu verstehen, musst du ein paar Schlüsselbegriffe kennen:
SparkSession
-
Die Klasse
SparkSession
, die im Paketpyspark.sql
definiert ist, ist der Einstiegspunkt für die Programmierung von Spark mit den Dataset- und DataFrame-APIs. Um etwas Nützliches mit einem Spark-Cluster zu tun, musst du zunächst eine Instanz dieser Klasse erstellen, die dir Zugriff auf eine Instanz vonSparkContext
gibt.Hinweis
PySparkverfügt über eine umfassende API (bestehend aus Paketen, Modulen, Klassen und Methoden) für den Zugriff auf die Spark-API. Es ist wichtig zu wissen, dass alle Spark-APIs, Pakete, Module, Klassen und Methoden, die in diesem Buch behandelt werden, PySpark-spezifisch sind. Wenn ich zum Beispiel von der Klasse
SparkContext
spreche, meine ich die Python-Klassepyspark.SparkContext
, die im Paketpyspark
definiert ist, und wenn ich von der KlasseSparkSession
spreche, meine ich die Python-Klassepyspark.sql.SparkSession
, die im Modulpyspark.sql
definiert ist. SparkContext
-
Die Klasse
SparkContext
, die im Paketpyspark
definiert ist, ist der Haupteinstiegspunkt für Spark-Funktionen. EinSparkContext
stellt eine Verbindung zum Spark-Cluster-Manager her und kann verwendet werden, um RDDs und Broadcast-Variablen im Cluster zu erstellen. Wenn du eine Instanz vonSparkSession
erstellst, wird dieSparkContext
innerhalb deiner Sitzung als AttributSparkSession.sparkContext
verfügbar. - Treiber
-
Alle Spark-Anwendungen (einschließlich der PySpark-Shell und eigenständiger Python-Programme) laufen als unabhängige Prozesse. Diese Prozesse werden von einem
SparkContext
in einem Treiberprogramm koordiniert. Um ein eigenständiges Python-Programm an Spark zu übergeben, musst du ein Treiberprogramm mit der PySpark-API (oder Java oder Scala) schreiben. Dieses Programm ist dafür zuständig, diemain()
Funktion der Anwendung auszuführen und dieSparkContext
zu erstellen. Es kann auch zum Erstellen von RDDs und Datenrahmen verwendet werden. - Arbeiter/in
-
In einer Spark-Cluster-Umgebung gibt es zwei Arten von Knoten: einen (oder zwei, für hohe Verfügbarkeit) Master und eine Reihe von Workern. Ein Worker ist ein beliebiger Knoten, der Programme im Cluster ausführen kann. Wenn ein Prozess für eine Anwendung gestartet wird, erwirbt diese Anwendung Executors auf Worker-Knoten, die für die Ausführung von Spark-Aufgaben zuständig sind.
- Cluster Manager
-
Der "Master"-Knoten wird als Clustermanager bezeichnet. Die Hauptfunktion dieses Knotens ist die Verwaltung der Clusterumgebung und der Server, die Spark zur Ausführung von Aufgaben nutzt. Der Clustermanager weist den einzelnen Anwendungen Ressourcen zu. Spark unterstützt fünf Arten von Clustermanagern, je nachdem, wo es ausgeführt wird:
-
Standalone (Spark's eigene integrierte Cluster-Umgebung)
-
Mesos (ein Kernel für verteilte Systeme)
-
Hinweis
Die Master/Worker-Terminologie ist zwar veraltet und wird in vielen Softwarekontexten nicht mehr verwendet, aber sie ist immer noch Teil der Funktionalität von Apache Spark, weshalb ich diese Terminologie in diesem Buch verwende.
Die Architektur von Spark in aller Kürze
Eine Übersicht über die Architektur von Spark ist in Abbildung 1-4 dargestellt. Informell besteht ein Spark-Cluster aus einem Master-Knoten (dem "Clustermanager"), der für die Verwaltung der Spark-Anwendungen zuständig ist, und einer Reihe von "Worker"-Knoten (Ausführungsknoten), die für die Ausführung der Aufgaben zuständig sind, die von den Spark-Anwendungen (deinen Anwendungen, die du auf dem Spark-Cluster ausführen möchtest) eingereicht wurden.
Je nachdem, in welcher Umgebung Spark läuft, ist der Clustermanager, der diesen Servercluster verwaltet, entweder der eigenständige Clustermanager von Spark, Kubernetes, Hadoop YARN oder Mesos. Wenn der Spark-Cluster läuft, kannst du Spark-Anwendungen an den Clustermanager übermitteln, der deiner Anwendung Ressourcen zuweist, damit du deine Datenanalyse durchführen kannst.
Dein Cluster kann aus einem, zehn, hunderten oder sogar tausenden von Worker-Knoten bestehen, je nach den Bedürfnissen deines Unternehmens und deinen Projektanforderungen. Du kannst Spark auf einem eigenständigen Server wie einem MacBook, Linux- oder Windows-PC ausführen, aberin der Regel wird Spark in Produktionsumgebungen auf einem Cluster von Linux-Servern betrieben. Um ein Spark-Programm auszuführen, brauchst du Zugang zu einem Spark-Cluster und ein Treiberprogramm, das die Transformationen und Aktionen auf RDDs von Daten deklariert und diese Anfragen an den Clustermanager weiterleitet. In diesem Buch werden alle Treiberprogramme in PySpark geschrieben.
Wenn Sie eine PySpark-Shell starten (durch Ausführen von<spark-installed-dir>/bin/pyspark
), bekommst du automatisch zwei Variablen/Objekte definiert:
spark
-
Eine Instanz von
SparkSession
, die sich ideal zum Erstellen von Datenrahmen eignet sc
-
Eine Instanz von
SparkContext
, die ideal für die Erstellung von RDDs ist
Wenn du eine eigenständige PySpark-Anwendung schreibst (einen Python-Treiber, der die PySpark-API verwendet), musst du explizit selbst eine Instanz vonSparkSession
erstellen. Eine SparkSession
kann dazu verwendet werden:
-
Datenrahmen erstellen
-
Datenrahmen als Tabellen registrieren
-
SQL über Tabellen und Cache-Tabellen ausführen
-
Lesen/Schreiben von Text, CSV, JSON, Parquet und anderen Dateiformaten
-
Lesen/Schreiben von relationalen Datenbanktabellen
PySpark definiert SparkSession
als:
pyspark
.
sql
.
SparkSession
(
Python
class
,
in
pyspark
.
sql
module
)
class
pyspark
.
sql
.
SparkSession
(
sparkContext
,
jsparkSession
=
None
)
SparkSession: the entry point to programming Spark with the RDD and DataFrame API.
Um eine SparkSession
in Python zu erstellen, verwende das hier gezeigte Builder-Muster:
# import required Spark class
from
pyspark.sql
import
SparkSession
# create an instance of SparkSession as spark
spark
=
SparkSession
.
builder
\
.
master
(
"
local
"
)
\
.
appName
(
"
my-application-name
"
)
\
.
config
(
"
spark.some.config.option
"
,
"
some-value
"
)
\
.
getOrCreate
(
)
# to debug the SparkSession
(
spark
.
version
)
# create a reference to SparkContext as sc
# SparkContext is used to create new RDDs
sc
=
spark
.
sparkContext
# to debug the SparkContext
(
sc
)
Importiert die Klasse
SparkSession
aus dem Modulpyspark.sql
.Ermöglicht den Zugriff auf die Builder-API, um
SparkSession
Instanzen zu erstellen.Setzt eine
config
Option. Optionen, die mit dieser Methode gesetzt werden, werden automatisch sowohl anSparkConf
als auch an die eigene Konfiguration vonSparkSession
weitergegeben. Bei der Erstellung einesSparkSession
Objekts kannst du eine beliebige Anzahl vonconfig(<key>, <value>)
Optionen festlegen.Ruft eine bestehende
SparkSession
ab oder erstellt eine neue, wenn es keine gibt, basierend auf den hier eingestellten Optionen.Nur für Debugging-Zwecke.
Ein
SparkContext
kann von einer Instanz vonSparkSession
aus referenziert werden.
PySpark definiert SparkContext
als:
class
pyspark
.
SparkContext
(
master
=
None
,
appName
=
None
,
...
)
SparkContext: the main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDD (the main data abstraction for Spark) and broadcast variables (such as collections and data structures) on that cluster.
SparkContext
ist der Haupteinstiegspunkt für Spark-Funktionen. Eine Shell (z. B. die PySpark-Shell) oder ein PySpark-Treiberprogramm kann nicht mehr als eine Instanz von erstellen. stellt die Verbindung zu einem Spark-Cluster dar und kann verwendet werden, um neue RDDs und Broadcast-Variablen (gemeinsame Datenstrukturen und Sammlungen - eine Art schreibgeschützte globale Variablen) auf diesem Cluster zu erstellen. SparkContext
SparkContext
Abbildung 1-5 zeigt, wie ein verwendet werden kann, um ein neues RDD aus einer Eingabetextdatei ( ) zu erstellen und es dann mit der Transformation in ein anderes RDD ( ) umzuwandeln. Wie du sehen kannst, SparkContext
records_rdd
flatMap()
words_rdd
RDD.flatMap(f)
ein neues RDD zurück, indem es zunächst eine Funktion (f
) auf alle Elemente des Quell-RDDs anwendet und dann die Ergebnisse reduziert.
Um die Objekte SparkSession
und SparkContext
zu erstellen, verwendest du das folgende Muster:
# create an instance of SparkSession
spark_session
=
SparkSession
.
builder
.
getOrCreate
()
# use the SparkSession to access the SparkContext
spark_context
=
spark_session
.
sparkContext
Wenn du nur mit RDDs arbeiten willst, kannst du eine Instanz vonSparkContext
wie folgt erstellen:
from
pyspark
import
SparkContext
spark_context
=
SparkContext
(
"local"
,
"myapp"
);
Nachdem du nun die Grundlagen von Spark kennst, wollen wir ein wenig tiefer in PySpark eintauchen.
Die Macht von PySpark
PySpark ist eine Python-API für Apache Spark, die die Zusammenarbeit zwischen Spark und der Programmiersprache Python unterstützen soll. Die meisten Datenwissenschaftler/innen kennen bereits Python, und PySpark macht es ihnen leicht, kurzen, prägnanten Code für verteilte Berechnungen mit Spark zu schreiben. PySpark ist ein All-in-One-Ökosystem, das mit seiner Unterstützung für RDDs, DataFrames, GraphFrames, MLlib, SQL und mehr komplexe Datenanforderungen bewältigen kann.
Ich werde dir die erstaunliche Leistung von PySpark anhand eines einfachen Beispiels zeigen. Angenommen, wir haben viele Datensätze mit Daten über URL-Besuche von Nutzern (die von einer Suchmaschine von vielen Webservern gesammelt wurden) im folgenden Format:
<url_address><,><frequency>
Hier sind ein paar Beispiele dafür, wie diese Aufzeichnungen aussehen:
http://mapreduce4hackers.com,19779 http://mapreduce4hackers.com,31230 http://mapreduce4hackers.com,15708 ... https://www.illumina.com,87000 https://www.illumina.com,58086 ...
Nehmen wir an, wir wollen den Durchschnitt, den Median und die Standardabweichung der Besuchszahlen pro Schlüssel (d.h. url_address
) ermitteln. Eine weitere Anforderung ist, dass wir alle Datensätze mit einer Länge von weniger als 5 verwerfen wollen (da es sich dabei um fehlerhafte URLs handeln könnte). Dafür lässt sich in PySpark leicht eine elegante Lösung finden, wie Abbildung 1-6 zeigt.
Erstellen wir zunächst einige grundlegende Python-Funktionen, die uns bei der Lösung unseres einfachen Problems helfen. Die erste Funktion, create_pair()
, akzeptiert einen einzelnen Datensatz der Form <url_address><,><frequency>
und gibt ein (Schlüssel, Wert)-Paar zurück (was uns später eineGROUP
BY
für das Schlüsselfeld ermöglicht), wobei der Schlüssel ein url_address
und der Wert das zugehörige frequency
ist:
# Create a pair of (url_address, frequency)
# where url_address is a key and frequency is a value
# record denotes a single element of RDD[String]
# record: <url_address><,><frequency>
def
create_pair
(
record
)
:
tokens
=
record
.
split
(
'
,
'
)
url_address
=
tokens
[
0
]
frequency
=
tokens
[
1
]
return
(
url_address
,
frequency
)
#end-def
Nimm einen Eintrag in der Form
<url_address><,><frequency>
an.Tokenisiere den Eingabedatensatz, indem du den
url_address
als Schlüssel (tokens[0]
) und denfrequency
als Wert (tokens[1]
) verwendest.Gib ein Paar von
(url_address, frequency)
zurück.
Die nächste Funktion, compute_stats()
, akzeptiert eine Liste von Häufigkeiten (als Zahlen) und berechnet drei Werte, den Durchschnitt, den Median und die Standardabweichung:
# Compute average, median, and standard
# deviation for a given set of numbers
import
statistics
# frequencies = [number1, number2, ...]
def
compute_stats
(
frequencies
)
:
average
=
statistics
.
mean
(
frequencies
)
median
=
statistics
.
median
(
frequencies
)
standard_deviation
=
statistics
.
stdev
(
frequencies
)
return
(
average
,
median
,
standard_deviation
)
#end-def
Dieses Modul bietet Funktionen zur Berechnung der mathematischen Statistik von numerischen Daten.
Akzeptiere eine Liste von Frequenzen.
Berechne den Durchschnitt der Frequenzen.
Berechne den Median der Häufigkeiten.
Berechne die Standardabweichung der Häufigkeiten.
Gib das Ergebnis als Tripel zurück.
Als Nächstes zeige ich dir die erstaunliche Leistung von PySpark in nur wenigen Codezeilen, indem ich Spark-Transformationen und unsere eigenen Python-Funktionen verwende:
# input_path = "s3://<bucket>/key"
input_path
=
"
/tmp/myinput.txt
"
results
=
spark
.
sparkContext
.
textFile
(
input_path
)
.
filter
(
lambda
record
:
len
(
record
)
>
5
)
.
map
(
create_pair
)
.
groupByKey
(
)
.
mapValues
(
compute_stats
)
spark
bezeichnet eine Instanz vonSparkSession
, dem Einstiegspunkt in die Programmierung von Spark.sparkContext
(ein Attribut vonSparkSession
) ist der Haupteinstiegspunkt für Spark-Funktionen.Daten als verteilte Menge von
String
Datensätzen lesen (erstellt eineRDD[String]
).Verwerfe Datensätze mit einer Länge kleiner oder gleich 5 (behalte Datensätze mit einer Länge größer als 5).
Erstelle
(url_address, frequency)
Paare aus den Eingabedatensätzen.Gruppiere die Daten nach Schlüsseln - jeder Schlüssel (
url_address
) wird mit einer Liste von Frequenzen verknüpft.Wende die Funktion
compute_stats()
auf die Liste der Frequenzen an.
Das Ergebnis ist eine Reihe von (Schlüssel, Wert)-Paaren der Form:
(url_address, (average, median, standard_deviation))
wobei url-address
ein Schlüssel und(average, median, standard_deviation)
ein Wert ist.
Hinweis
Das Wichtigste an Spark ist, dass es die Gleichzeitigkeit von Funktionen und Operationen durch die Partitionierung von Daten maximiert. Nimm ein Beispiel:
Wenn deine Eingabedaten 600 Milliarden Zeilen haben und du einen Cluster mit 10 Knoten verwendest, werden deine Eingabedaten aufgeteilt in N
( > 1) Chunks aufgeteilt, die unabhängig und parallel verarbeitet werden. Wenn N=20,000
(die Anzahl der Chunks oder Partitionen), dann hat jeder Chunk etwa 30 Millionen Datensätze/Elemente (600.000.000.000 / 20.000 = 30.000.000). Wenn du einen großen Cluster hast, können alle 20.000 Chunks auf einmal verarbeitet werden. Wenn du einen kleineren Cluster hast, kann es sein, dass nur alle 100 Chunks unabhängig und parallel verarbeitet werden können. Dieser Prozess wird fortgesetzt, bis alle 20.000 Chunks verarbeitet sind.
PySpark Architektur
PySpark baut auf der Java-API von Spark auf. Die Daten werden in Python verarbeitet und in der Java Virtual Machine (JVM) zwischengespeichert bzw. geshuffelt (das Konzept des Shuffelns werde ich in Kapitel 2 behandeln). Eine Übersicht über die Architektur von PySpark ist in Abbildung 1-7 dargestellt.
Und der Datenfluss von PySpark ist in Abbildung 1-8 dargestellt.
Im Python-Treiberprogramm (deine Spark-Anwendung in Python) verwendet SparkContext
Py4J, um eine JVM zu starten und eineJavaSparkContext
zu erstellen. Py4J wird im Treiber nur für die lokale Kommunikation zwischen den Python- und Java-Objekten SparkContext
verwendet; große Datenübertragungen werden über einen anderen Mechanismus durchgeführt. RDD-Transformationen in Python werden aufPythonRDD
Objekte in Java abgebildet. Auf entfernten Worker-Rechnern starten PythonRDD
Objekte Python-Unterprozesse und kommunizieren mit diesen über Pipes, um den Code des Benutzers und die zu verarbeitenden Daten zu senden.
Hinweis
Mit Py4J können Python-Programme, die in einem Python-Interpreter laufen, dynamisch auf Java-Objekte in einer JVM zugreifen. Methoden werden so aufgerufen, als befänden sich die Java-Objekte im Python-Interpreter, und auf Java-Sammlungen kann mit Standard-Python-Sammlungsmethoden zugegriffen werden. Py4J ermöglicht es Java-Programmen auch, Python-Objekte zurückzurufen.
Spark-Datenabstraktionen
Um Daten in der Programmiersprache Python zu manipulieren, verwendest du Ganzzahlen, Strings, Listen und Wörterbücher. Um Daten in Spark zu manipulieren und zu analysieren, musst du sie als Spark-Datensatz darstellen. Spark unterstützt drei Arten von Dataset-Abstraktionen:
-
RDD (Resilient Distributed Dataset):
-
Low-Level-API
-
Bezeichnet durch
RDD[T]
(jedes Element hat den TypT
)
-
-
Datenrahmen (ähnlich wie bei relationalen Tabellen):
-
High-Level-API
-
Bezeichnet mit
Table(column_name_1, column_name_2, ...)
-
-
Datensatz (ähnlich wie bei relationalen Tabellen):
-
High-Level-API (nicht in PySpark verfügbar)
-
Die Datenabstraktion Dataset wird in stark typisierten Sprachen wie Java verwendet und wird in PySpark nicht unterstützt. RDDs und Datenrahmen werden in den folgenden Kapiteln ausführlich behandelt, aber ich gebe hier eine kurze Einführung.
RDD Beispiele
Im Wesentlichen stellt ein RDD deine Daten als eine Sammlung von Elementen dar. Es ist eine unveränderliche Menge von verteilten Elementen des Typs T
, die als RDD[T]
bezeichnet wird.
Tabelle 1-1 zeigt Beispiele für drei einfache Arten von RDDs:
RDD[Integer]
-
Jedes Element ist ein
Integer
. RDD[String]
-
Jedes Element ist eine
String
. RDD[(String, Integer)]
-
Jedes Element ist ein Paar von
(String, Integer)
.
RDD[Integer] | RDD[String] | RDD[(String, Integer)] |
---|---|---|
|
|
|
|
|
|
|
|
|
... |
... |
... |
Tabelle 1-2 ist ein Beispiel für ein komplexes RDD. Jedes Element ist ein (Schlüssel, Wert)-Paar, wobei der Schlüssel eine String
und der Wert ein Tripel von (Integer, Integer, Double)
ist.
RDD[(String, (Integer, Integer, Double))] |
---|
|
|
|
|
... |
Spark RDD-Operationen
Spark RDDs sind schreibgeschützt, unveränderlich und verteilt. Einmal erstellt, können sie nicht mehr verändert werden: Du kannst keine Datensätze hinzufügen, löschen oder aktualisieren. Sie können jedoch umgewandelt werden. RDDs unterstützen zwei Arten von Operationen: Transformationen, die das/die Quell-RDD(s) in ein oder mehrere neue RDDs umwandeln, und Aktionen, die das/die Quell-RDD(s) in ein Nicht-RDD-Objekt wie ein Wörterbuch oder Array umwandeln. Die Beziehung zwischen RDDs, Transformationen und Aktionen ist in Abbildung 1-9 dargestellt.
Wir werden in den folgenden Kapiteln noch viel detaillierter auf die Spark-Transformationen eingehen, mit Arbeitsbeispielen, die dir helfen, sie zu verstehen, aber ich werde hier eine kurze Einführung geben.
Verwandlungen
Eine Transformation in Spark ist eine Funktion, die ein bestehendes RDD (das Quell-RDD) nimmt, eine Transformation darauf anwendet und ein neues RDD (das Ziel-RDD) erstellt. Beispiele hierfür sind: map()
,flatMap()
, groupByKey()
, reduceByKey()
undfilter()
.
Informell können wir eine Transformation wie folgt ausdrücken:
transformation: source_RDD[V] --> target_RDD[T]
RDDs werden erst ausgewertet, wenn eine Aktion auf ihnen ausgeführt wird: Das bedeutet, dass Transformationen faul ausgewertet werden. Wenn ein RDD während einer Transformation fehlschlägt, baut die Datenlinie der Transformationen das RDD neu auf.
Die meisten Spark-Transformationen erstellen ein einzelnes RDD, aber es ist auch möglich, dass sie mehrere Ziel-RDDs erstellen. Die Ziel-RDD(s) können kleiner, größer oder gleich groß wie die Quell-RDD sein.
Das folgende Beispiel zeigt eine Abfolge von Umwandlungen:
tuples
=
[(
'A'
,
7
),
(
'A'
,
8
),
(
'A'
,
-
4
),
(
'B'
,
3
),
(
'B'
,
9
),
(
'B'
,
-
1
),
(
'C'
,
1
),
(
'C'
,
5
)]
rdd
=
spark
.
sparkContext
.
parallelize
(
tuples
)
# drop negative values
positives
=
rdd
.
filter
(
lambda
x
:
x
[
1
]
>
0
)
positives
.
collect
()
[(
'A'
,
7
),
(
'A'
,
8
),
(
'B'
,
3
),
(
'B'
,
9
),
(
'C'
,
1
),
(
'C'
,
5
)]
# find sum and average per key using groupByKey()
sum_and_avg
=
positives
.
groupByKey
()
.
mapValues
(
lambda
v
:
(
sum
(
v
),
float
(
sum
(
v
))
/
len
(
v
)))
# find sum and average per key using reduceByKey()
# 1. create (sum, count) per key
sum_count
=
positives
.
mapValues
(
lambda
v
:
(
v
,
1
))
# 2. aggregate (sum, count) per key
sum_count_agg
=
sum_count
.
reduceByKey
(
lambda
x
,
y
:
(
x
[
0
]
+
y
[
0
],
x
[
1
]
+
y
[
1
]))
# 3. finalize sum and average per key
sum_and_avg
=
sum_count_agg
.
mapValues
(
lambda
v
:
(
v
[
0
],
float
(
v
[
0
])
/
v
[
1
]))
Tipp
Die Transformation groupByKey()
fasst die Werte für jeden Schlüssel im RDD in einer einzigen Sequenz zusammen, ähnlich wie eine SQL-Anweisung GROUP
BY
. Diese Transformation kann zu OOM-Fehlern (Out of Memory) führen, wenn die Daten über das Netzwerk von Spark-Servern gesendet und auf den Reducern/Workern gesammelt werden und die Anzahl der Werte pro Schlüssel in die Tausende oder Millionen geht.
Bei der Transformation reduceByKey()
werden die Daten jedoch in jeder Partition kombiniert, so dass nur eine Ausgabe für jeden Schlüssel in jeder Partition über das Netzwerk der Spark-Server gesendet werden muss. Das macht sie skalierbarer als groupByKey()
. reduceByKey()
führt die Werte für jeden Schlüssel mithilfe einer assoziativen und kommutativen Reduktionsfunktion zusammen. Sie kombiniert alle Werte (pro Schlüssel) zu einem anderen Wert mit genau demselben Datentyp (dies ist eine Einschränkung, die durch die Verwendung dercombineByKey()
Transformation überwunden werden kann). Insgesamt ist die reduceByKey()
besser skalierbar als die groupByKey()
. Wir werden in Kapitel 4 mehr über diese Themen sprechen.
Aktionen
Spark-Aktionen sind RDD-Operationen oder Funktionen, die Nicht-RDD-Werte erzeugen. Informell können wir eine Aktion wie folgt ausdrücken:
action: RDD => non-RDD value
Aktionen können die Auswertung von RDDs auslösen (die, wie du dich erinnern wirst, faul ausgewertet werden). Die Ausgabe einer Aktion ist jedoch ein konkreter Wert: eine gespeicherte Datei, ein Wert wie eine ganze Zahl, eine Anzahl von Elementen, eine Liste von Werten, ein Wörterbuch und so weiter.
Im Folgenden findest du Beispiele für Aktionen:
reduce()
-
Wendet eine Funktion an, um einen einzelnen Wert zu liefern, wie zum Beispiel das Addieren von Werten für eine bestimmte
RDD[Integer]
collect()
-
Konvertiert eine
RDD[T]
in eine Liste vom TypT
count()
-
Ermittelt die Anzahl der Elemente in einem gegebenen RDD
saveAsTextFile()
-
Speichert RDD-Elemente auf einer Festplatte
saveAsMap()
-
Speichert
RDD[(K, V)]
Elemente auf einer Festplatte alsdict[K, V]
DataFrame Beispiele
Ähnlich wie ein RDD ist ein DataFrame in Spark eine unveränderliche, verteilte Datensammlung. Im Gegensatz zu einem RDD sind die Daten jedoch in benannten Spalten organisiert, wie eine Tabelle in einer relationalen Datenbank. Dies soll die Verarbeitung großer Datensätze erleichtern. Mit Datenrahmen können Programmierer/innen einer verteilten Datensammlung eine Struktur auferlegen, die eine höhere Abstraktionsebene ermöglicht. Sie machen auch die Verarbeitung von CSV- und JSON-Dateien viel einfacher als mit RDDs.
Das folgende Beispiel eines Datenrahmens hat drei Spalten:
DataFrame[name, age, salary] name: String, age: Integer, salary: Integer +-----+----+---------+ | name| age| salary| +-----+----+---------+ | bob| 33| 45000| | jeff| 44| 78000| | mary| 40| 67000| | ...| ...| ...| +-----+----+---------+
Ein DataFrame kann aus vielen verschiedenen Quellen erstellt werden, z. B. aus Hive-Tabellen, strukturierten Datendateien (SDF), externen Datenbanken oder bestehenden RDDs. Die DataFrames-API wurde für moderne Big-Data- und Data-Science-Anwendungen entwickelt und orientiert sich an den DataFrames in R und Pandas in Python. Wie wir in späteren Kapiteln sehen werden, können wir SQL-Abfragen gegen DataFrames ausführen.
Spark SQL verfügt über ein umfangreiches Set an leistungsstarken Datenrahmen-Operationen, darunter:
-
Aggregatfunktionen (Min, Max, Summe, Durchschnitt, etc.)
-
Funktionen der Sammlung
-
Mathe-Funktionen
-
Sortierfunktionen
-
String-Funktionen
-
Benutzerdefinierte Funktionen (UDFs)
Du kannst zum Beispiel ganz einfach eine CSV-Datei lesen und daraus einen Datenrahmen erstellen:
# define input path
virus_input_path
=
"s3://mybucket/projects/cases/case.csv"
# read CSV file and create a DataFrame
cases_dataframe
=
spark
.
read
.
load
(
virus_input_path
,
format
=
"csv"
,
sep
=
","
,
inferSchema
=
"true"
,
header
=
"true"
)
# show the first 3 rows of created DataFrame
cases_dataframe
.
show
(
3
)
+-------+-------+-----------+--------------+---------+
|
case_id
|
country
|
city
|
infection_case
|
confirmed
|
+-------+-------+-----------+--------------+---------+
|
C0001
|
USA
|
New
York
|
contact
|
175
|
+-------+-------+-----------+--------------+---------+
|
C0008
|
USA
|
New
Jersey
|
unknown
|
25
|
+-------+-------+-----------+--------------+---------+
|
C0009
|
USA
|
Cupertino
|
contact
|
100
|
+-------+-------+-----------+--------------+---------+
Um die Ergebnisse nach der Anzahl der Fälle in absteigender Reihenfolge zu sortieren, können wir die Funktion sort()
verwenden:
# We can do this using the F.desc function:
from
pyspark.sql
import
functions
as
F
cases_dataframe
.
sort
(
F
.
desc
(
"confirmed"
))
.
show
()
+-------+-------+-----------+--------------+---------+
|
case_id
|
country
|
city
|
infection_case
|
confirmed
|
+-------+-------+-----------+--------------+---------+
|
C0001
|
USA
|
New
York
|
contact
|
175
|
+-------+-------+-----------+--------------+---------+
|
C0009
|
USA
|
Cupertino
|
contact
|
100
|
+-------+-------+-----------+--------------+---------+
|
C0008
|
USA
|
New
Jersey
|
unknown
|
25
|
+-------+-------+-----------+--------------+---------+
Wir können auch ganz einfach Zeilen filtern:
cases_dataframe
.
filter
((
cases_dataframe
.
confirmed
>
100
)
&
(
cases_dataframe
.
country
==
'USA'
))
.
show
()
+-------+-------+-----------+--------------+---------+
|
case_id
|
country
|
city
|
infection_case
|
confirmed
|
+-------+-------+-----------+--------------+---------+
|
C0001
|
USA
|
New
York
|
contact
|
175
|
+-------+-------+-----------+--------------+---------+
...
Um dir einen besseren Eindruck von der Leistungsfähigkeit der DataFrames von Spark zu geben, gehen wir ein Beispiel durch. Wir erstellen einen Datenrahmen und ermitteln den Durchschnitt und die Summe der Arbeitsstunden der Mitarbeiter pro Abteilung:
# Import required libraries
from
pyspark.sql
import
SparkSession
from
pyspark.sql.functions
import
avg
,
sum
# Create a DataFrame using SparkSession
spark
=
SparkSession
.
builder
.
appName
(
"demo"
)
.
getOrCreate
()
dept_emps
=
[(
"Sales"
,
"Barb"
,
40
),
(
"Sales"
,
"Dan"
,
20
),
(
"IT"
,
"Alex"
,
22
),
(
"IT"
,
"Jane"
,
24
),
(
"HR"
,
"Alex"
,
20
),
(
"HR"
,
"Mary"
,
30
)]
df
=
spark
.
createDataFrame
(
dept_emps
,
[
"dept"
,
"name"
,
"hours"
])
# Group the same depts together, aggregate their hours, and compute an average
averages
=
df
.
groupBy
(
"dept"
)
.
agg
(
avg
(
"hours"
)
.
alias
(
'average'
),
sum
(
"hours"
)
.
alias
(
'total'
))
# Show the results of the final execution
averages
.
show
()
+-----+--------+------+
|
dept
|
average
|
total
|
+-----+--------+------+
|
Sales
|
30.0
|
60.0
|
|
IT
|
23.0
|
46.0
|
|
HR
|
25.0
|
50.0
|
+-----+--------+------+
Wie du siehst, sind die Datenrahmen von Spark leistungsfähig genug, um Milliarden von Zeilen mit einfachen, aber leistungsstarken Funktionen zu bearbeiten.
Verwendung der PySpark Shell
Es gibt zwei Möglichkeiten, wie du PySpark nutzen kannst:
-
Verwende die PySpark-Shell (zum Testen und interaktiven Programmieren).
-
PySpark in einer in sich geschlossenen Anwendung verwenden. In diesem Fall schreibst du ein Python-Treiberprogramm (z. B.my_pyspark_program.py) mit Hilfe der PySpark-API und führst es dann mit dem Befehl
spark-submit
aus:export SUBMIT=$SPARK_HOME/bin/spark-submit $SUBMIT [options] my_pyspark_program.py <parameters>
wobei
<parameters>
eine Liste von Parametern ist, die von deinem PySpark-Programm(my_pyspark_program.py) verwendet werden.
Hinweis
Einzelheiten zur Verwendung des Befehls spark-submit
findest du unter"Einreichen von Anträgen" in der Spark-Dokumentation.
In diesem Abschnitt konzentrieren wir uns auf die interaktive Shell von Spark für Python-Benutzer, ein leistungsstarkes Tool, mit dem du Daten interaktiv analysieren und die Ergebnisse sofort sehen kannst (Spark bietet auch eine Scala-Shell). Die PySpark-Shell kann sowohl auf Einzelmaschinen- als auch auf Cluster-Installationen von Spark eingesetzt werden. Du startest die Shell mit folgendem Befehl, wobei SPARK_HOME
das Installationsverzeichnis von Spark auf deinem System bezeichnet:
export SPARK_HOME=<spark-installation-directory> $SPARK_HOME/bin/pyspark
Zum Beispiel:
export SPARK_HOME="/home/spark" $SPARK_HOME/bin/pyspark Python 3.7.2 Welcome to Spark version 3.1.2 Using Python version 3.7.2 SparkSession available as spark. SparkContext available as sc >>>
Wenn du die Shell startest, zeigt PySpark einige nützliche Informationen an, darunter Details zu den verwendeten Python- und Spark-Versionen (beachte, dass die Ausgabe hier gekürzt wurde). Das Symbol >>>
wird als Eingabeaufforderung für die PySpark-Shell verwendet. Diese Eingabeaufforderung zeigt an, dass du jetzt Python- oder PySpark-Befehle schreiben und die Ergebnisse ansehen kannst.
Damit du dich mit der PySpark-Shell vertraut machen kannst, führen wir dich in den folgenden Abschnitten durch einige grundlegende Anwendungsbeispiele.
Starten der PySpark Shell
Um in eine PySpark-Shell zu gelangen, führen wir pyspark
wie folgt aus:
$SPARK_HOME/bin/pyspark
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.1.2 /_/ SparkSession available as 'spark'. SparkContext available as 'sc'.
>>
>
sc
.
version
'
3.1.2
'
>>
>
spark
.
version
'
3.1.2
'
Wenn du
pyspark
ausführst, wird eine neue Shell erstellt. Die Ausgabe wurde hier gekürzt.Vergewissere dich, dass
SparkContext
alssc
angelegt ist.Vergewissere dich, dass
SparkSession
alsspark
angelegt ist.
Sobald du die PySpark-Shell aufrufst, wird eine Instanz von SparkSession
als Variable spark
und eine Instanz von SparkContext
als Variable sc
erstellt. Wie du bereits in diesem Kapitel gelernt hast, ist SparkSession
der Einstiegspunkt in die Programmierung von Spark mit den Dataset- und DataFrame-APIs. Mit SparkSession
kannst du Datenrahmen erstellen, Datenrahmen als Tabellen registrieren, SQL über Tabellen ausführen, Tabellen zwischenspeichern und CSV-, JSON- und Parquet-Dateien lesen.
Wenn du PySpark in einer eigenständigen Anwendung verwenden möchtest, musst du explizit ein SparkSession
erstellen, indem du das in "Spark-Architektur in Kürze" gezeigte Builder-Muster verwendest . SparkContext
ist der Haupteinstiegspunkt für Spark-Funktionen; mit ihm können RDDs aus Textdateien und Python-Sammlungen erstellt werden. Das werden wir uns als Nächstes ansehen.
Ein RDD aus einer Sammlung erstellen
Mit Spark können wir neue RDDs aus Dateien und Sammlungen (Datenstrukturen wie Arrays und Listen) erstellen. Hier verwenden wir SparkContext.parallelize()
, um ein neues RDD aus einer Sammlung (dargestellt als data
) zu erstellen:
>>
>
data
=
[
(
"
fox
"
,
6
)
,
(
"
dog
"
,
5
)
,
(
"
fox
"
,
3
)
,
(
"
dog
"
,
8
)
,
(
"
cat
"
,
1
)
,
(
"
cat
"
,
2
)
,
(
"
cat
"
,
3
)
,
(
"
cat
"
,
4
)
]
>>
>
# use SparkContext (sc) as given by the PySpark shell
>>
>
# create an RDD as rdd
>>
>
rdd
=
sc
.
parallelize
(
data
)
>>
>
rdd
.
collect
(
)
[
(
'
fox
'
,
6
)
,
(
'
dog
'
,
5
)
,
(
'
fox
'
,
3
)
,
(
'
dog
'
,
8
)
,
(
'
cat
'
,
1
)
,
(
'
cat
'
,
2
)
,
(
'
cat
'
,
3
)
,
(
'
cat
'
,
4
)
]
>>
>
rdd
.
count
(
)
8
Aggregieren und Zusammenführen von Werten von Schlüsseln
Die Transformation reduceByKey()
wird zum Zusammenführen und Aggregieren von Werten verwendet. In diesem Beispiel beziehen sich x
und y
auf die Werte desselben Schlüssels:
>>
>
sum_per_key
=
rdd
.
reduceByKey
(
lambda
x
,
y
:
x
+
y
)
>>
>
sum_per_key
.
collect
(
)
[
(
'
fox
'
,
9
)
,
(
'
dog
'
,
13
)
,
(
'
cat
'
,
10
)
]
Das Quell-RDD für diese Transformation muss aus (Schlüssel, Wert)-Paaren bestehen. reduceByKey()
führt die Werte für jeden Schlüssel mithilfe einer assoziativen und kommutativen Reduktionsfunktion zusammen. Diese führt die Zusammenführung auch lokal auf jedem Mapper durch, bevor sie die Ergebnisse an einen Reducer sendet, ähnlich wie ein "Combiner" in MapReduce. Die Ausgabewird mit numPartitions
partitioniert oder mit dem Standard-Parallelitätslevel, wenn numPartitions
nicht angegeben ist. Der Standardpartitionierer ist HashPartitioner
.
Wenn T
der Typ des Wertes für (Schlüssel, Wert) Paare ist, dann kann reduceByKey()
's func()
definiert werden als:
# source_rdd : RDD[(K, T)]
# target_rdd : RDD[(K, T)]
target_rdd
=
source_rdd
.
reduceByKey
(
lambda
x
,
y
:
func
(
x
,
y
)
)
# OR you may write it by passing the function name
# target_rdd = source_rdd.reduceByKey(func)
# where
# func(T, T) -> T
# Then you may define `func()` in Python as:
# x: type of T
# y: type of T
def
func
(
x
,
y
)
:
result
=
<
aggregation
of
x
and
y
:
return
a
result
of
type
T
>
return
result
#end-def
Das bedeutet, dass:
-
Es gibt zwei Eingangsargumente (vom gleichen Typ,
T
) für den Reducerfunc()
. -
Der Rückgabetyp von
func()
muss derselbe sein wie der EingabetypT
(diese Einschränkung kann vermieden werden, wenn du die TransformationcombineByKey()
verwendest). -
Der Verkleinerer
func()
muss assoziativ sein. Informell wird eine binäre Operationf()
auf einer MengeT
als assoziativ bezeichnet, wenn sie das Assoziationsgesetz erfüllt, das besagt, dass die Reihenfolge, in der die Zahlen gruppiert werden, das Ergebnis der Operation nicht verändert. -
Der Verkleinerer
func()
muss kommutativ sein: informell eine Funktionf()
für dief(x, y) = f(y, x)
für alle Werte vonx
undy
. Das heißt, dass eine Änderung der Reihenfolge der Zahlen keinen Einfluss auf das Ergebnis der Operation haben darf.Kommutativgesetz
f(x, y) = f(y, x)
Das Kommutativgesetz gilt auch für Addition und Multiplikation, aber nicht für Subtraktion oder Division. Ein Beispiel:
5 + 3 = 3 + 5 aber 5 - 3 ≠ 3 - 5
Deshalb darfst du keine Subtraktions- oder Divisionsoperationen in einer reduceByKey()
Transformation verwenden.
Ähnliche Schlüssel gruppieren
Wir können die Transformation groupByKey()
verwenden, um die Werte für jeden Schlüssel im RDD in einer einzigen Sequenz zusammenzufassen:
>>
>
grouped
=
rdd
.
groupByKey
(
)
>>
>
grouped
.
collect
(
)
[
(
'
fox
'
,
<
ResultIterable
object
at
0x10f45c790
>
)
,
(
'
dog
'
,
<
ResultIterable
object
at
0x10f45c810
>
)
,
(
'
cat
'
,
<
ResultIterable
object
at
0x10f45cd90
>
)
]
>>
>
>>
>
# list(v) converts v as a ResultIterable into a list
>>
>
grouped
.
map
(
lambda
(
k
,
v
)
:
(
k
,
list
(
v
)
)
)
.
collect
(
)
[
(
'
fox
'
,
[
6
,
3
]
)
,
(
'
dog
'
,
[
5
,
8
]
)
,
(
'
cat
'
,
[
1
,
2
,
3
,
4
]
)
]
Gruppiere Elemente mit demselben Schlüssel zu einer Folge von Elementen.
Schau dir das Ergebnis an.
Der vollständige Name von
ResultIterable
istpyspark.resultiterable.ResultIterable
.Wende zuerst
map()
und danncollect()
an, die eine Liste zurückgeben, die alle Elemente des resultierenden RDDs enthält. Die Funktionlist()
wandeltResultIterable
in eine Liste von Objekten um.
Das Quell-RDD für diese Transformation muss aus (Schlüssel, Wert)-Paaren bestehen. groupByKey()
gruppiert die Werte für jeden Schlüssel im RDD zu einer einzigen Sequenz und teilt das resultierende RDD mit numPartitions
Partitionen oder mit dem Standard-Parallelitätsgrad auf, wenn numPartitions
nicht angegeben ist. Wenn du eine Gruppierung (mit der TransformationgroupByKey()
) vornimmst, um eine Aggregation (z. B. eine Summe oder einen Durchschnitt) über jeden Schlüssel durchzuführen, wird die Verwendung von reduceByKey()
oder aggregateByKey()
eine viel bessere Leistung bringen.
Aggregieren von Werten für ähnliche Schlüssel
Um die Werte für jeden Schlüssel zu aggregieren und zu summieren, können wir die Transformation mapValues()
und die Funktion sum()
verwenden:
>>
>
aggregated
=
grouped
.
mapValues
(
lambda
values
:
sum
(
values
)
)
>>
>
aggregated
.
collect
(
)
[
(
'
fox
'
,
9
)
,
(
'
dog
'
,
13
)
,
(
'
cat
'
,
10
)
]
values
ist eine Folge von Werten pro Schlüssel. Wir leiten jeden Wert im (Schlüssel, Wert) Paar RDD durch eine Mapper-Funktion (indem wir allevalues
mitsum(values)
hinzufügen), ohne die Schlüssel zu ändern.Zum Debuggen geben wir eine Liste zurück, die alle Elemente in diesem RDD enthält.
Wir haben mehrere Möglichkeiten, Werte zu aggregieren und zu summieren: reduceByKey()
und groupByKey()
, um nur einige zu nennen. Im Allgemeinen ist die TransformationreduceByKey()
effizienter als die Transformation groupByKey()
. Weitere Einzelheiten dazu findest du in Kapitel 4.
Wie du in den folgenden Kapiteln sehen wirst, verfügt Spark über viele weitere leistungsstarke Transformationen, die ein RDD in ein neues RDD umwandeln können. Wie bereits erwähnt, sind RDDs schreibgeschützt, unveränderlich und verteilt. RDD-Transformationen geben einen Zeiger auf ein neues RDD zurück und ermöglichen es dir, Abhängigkeiten zwischen RDDs zu erstellen. Jedes RDD in der Abhängigkeitskette (oder Kette von Abhängigkeiten) hat eine Funktion zur Berechnung seiner Daten und einen Zeiger (Abhängigkeit) auf sein Eltern-RDD.
Datenanalyse-Tools für PySpark
- Jupyter
-
Jupyter ist ein großartiges Werkzeug, um Programme zu testen und Prototypen zu erstellen. PySpark kann auch in Jupyter-Notizbüchern verwendet werden; es ist sehr praktisch für die explorative Datenanalyse.
- Apache Zeppelin
-
Zeppelin ist ein webbasiertes Notizbuch, das datengesteuerte, interaktive Datenanalysen und kollaborative Dokumente mit SQL, Python, Scala und mehr ermöglicht.
ETL-Beispiel mit Datenrahmen
In der Datenanalyse und -verarbeitung ist ETL das allgemeine Verfahren zum Kopieren von Daten aus einer oder mehreren Quellen in ein Zielsystem, das die Daten anders als die Quelle(n) oder in einem anderen Kontext als die Quelle(n) darstellt. Hier werde ich zeigen, wie Spark ETL möglich und einfach macht.
Für dieses ETL-Beispiel verwende ich die Daten der Volkszählung 2010 im JSON-Format(census_2010.json):
$ wc -l census_2010.json 101 census_2010.json $ head -5 census_2010.json {"females": 1994141, "males": 2085528, "age": 0, "year": 2010} {"females": 1997991, "males": 2087350, "age": 1, "year": 2010} {"females": 2000746, "males": 2088549, "age": 2, "year": 2010} {"females": 2002756, "males": 2089465, "age": 3, "year": 2010} {"females": 2004366, "males": 2090436, "age": 4, "year": 2010}
Hinweis
Diese Daten stammen vom U.S. Census Bureau, das zum Zeitpunkt der Erstellung dieses Buches nur die binären Optionen männlich und weiblich zur Verfügung stellt. Wir bemühen uns, so umfassend wie möglich zu sein, und hoffen, dass nationale Datensätze wie diese in Zukunft mehr Optionen bieten werden.
Definieren wir unseren ETL-Prozess:
- Extraktion
-
Zuerst erstellen wir einen Datenrahmen aus einem gegebenen JSON-Dokument.
- Transformation
-
Dann filtern wir die Daten und behalten die Datensätze für Senioren (
age > 54
). Als Nächstes fügen wir eine neue Spalte hinzu,total
, die die Summe der männlichen und weiblichen Daten darstellt. - Laden
-
Schließlich schreiben wir den überarbeiteten Datenrahmen in eine MySQL-Datenbank und überprüfen den Ladevorgang.
Lass uns diesen Prozess ein bisschen genauer untersuchen.
Extraktion
Um eine richtige Extraktion durchzuführen, müssen wir zunächst eine Instanz der Klasse SparkSession
erstellen:
from
pyspark.sql
import
SparkSession
spark
=
SparkSession
.
builder
\.
master
(
"local"
)
\.
appName
(
"ETL"
)
\.
getOrCreate
()
Als Nächstes lesen wir das JSON und erstellen einen Datenrahmen:
>>>
input_path
=
"census_2010.json"
>>>
census_df
=
spark
.
read
.
json
(
input_path
)
>>>
census_df
.
count
()
101
>>>
census_df
.
show
(
200
)
+---+-------+-------+----+
|
age
|
females
|
males
|
year
|
+---+-------+-------+----+
|
0
|
1994141
|
2085528
|
2010
|
|
1
|
1997991
|
2087350
|
2010
|
|
2
|
2000746
|
2088549
|
2010
|
...
|
54
|
2221350
|
2121536
|
2010
|
|
55
|
2167706
|
2059204
|
2010
|
|
56
|
2106460
|
1989505
|
2010
|
...
|
98
|
35778
|
8321
|
2010
|
|
99
|
25673
|
4612
|
2010
|
+---+-------+-------+----+
only
showing
top
100
rows
Transformation
Die Umwandlung kann viele Prozesse umfassen, deren Zweck es ist, die Daten zu bereinigen, zu formatieren oder Berechnungen durchzuführen, um sie an deine Anforderungen anzupassen. Du kannst zum Beispiel fehlende oder doppelte Daten entfernen, Spalten zusammenführen, um neue Spalten zu erstellen, oder bestimmte Zeilen oder Spalten herausfiltern. Sobald wir den Datenrahmen durch den Extraktionsprozess erstellt haben, können wir viele nützliche Transformationen durchführen, wie z. B. nur die Senioren auswählen:
>>>
seniors
=
census_df
[
census_df
[
'age'
]
>
54
]
>>>
seniors
.
count
()
46
>>>
seniors
.
show
(
200
)
+---+-------+-------+----+
|
age
|
females
|
males
|
year
|
+---+-------+-------+----+
|
55
|
2167706
|
2059204
|
2010
|
|
56
|
2106460
|
1989505
|
2010
|
|
57
|
2048896
|
1924113
|
2010
|
...
|
98
|
35778
|
8321
|
2010
|
|
99
|
25673
|
4612
|
2010
|
|
100
|
51007
|
9506
|
2010
|
+---+-------+-------+----+
Als Nächstes erstellen wir eine neue aggregierte Spalte namens total
, in der die Anzahl der Männer und Frauen addiert wird:
>>>
from
pyspark.sql.functions
import
lit
>>>
seniors_final
=
seniors
.
withColumn
(
'total'
,
lit
(
seniors
.
males
+
seniors
.
females
))
>>>
seniors_final
.
show
(
200
)
+---+-------+-------+----+-------+
|
age
|
females
|
males
|
year
|
total
|
+---+-------+-------+----+-------+
|
55
|
2167706
|
2059204
|
2010
|
4226910
|
|
56
|
2106460
|
1989505
|
2010
|
4095965
|
|
57
|
2048896
|
1924113
|
2010
|
3973009
|
...
|
98
|
35778
|
8321
|
2010
|
44099
|
|
99
|
25673
|
4612
|
2010
|
30285
|
|
100
|
51007
|
9506
|
2010
|
60513
|
+---+-------+-------+----+-------+
Laden
Der Ladevorgang beinhaltet das Speichern oder Schreiben der endgültigen Ausgabe des Transformationsschritts. Hier werden wir den seniors_final
Datenrahmen in eine MySQL-Tabelle schreiben:
seniors_final
\.
write
\.
format
(
"jdbc"
)
\.
option
(
"driver"
,
"com.mysql.jdbc.Driver"
)
\.
mode
(
"overwrite"
)
\.
option
(
"url"
,
"jdbc:mysql://localhost/testdb"
)
\.
option
(
"dbtable"
,
"seniors"
)
\.
option
(
"user"
,
"root"
)
\.
option
(
"password"
,
"root_password"
)
\.
save
()
Der letzte Schritt des Ladens ist die Überprüfung des Ladevorgangs:
$
mysql
-uroot
-p
Enter
password:
<
password>
Your
MySQL
connection
id
is
9
Server
version:
5
.7.30
MySQL
Community
Server
(
GPL
)
mysql>
use
testdb
;
Database
changed
mysql>
select
*
from
seniors
;
+------+---------+---------+------+---------+
|
age
|
females
|
males
|
year
|
total
|
+------+---------+---------+------+---------+
|
55
|
2167706
|
2059204
|
2010
|
4226910
|
|
56
|
2106460
|
1989505
|
2010
|
4095965
|
|
57
|
2048896
|
1924113
|
2010
|
3973009
|
...
|
98
|
35778
|
8321
|
2010
|
44099
|
|
99
|
25673
|
4612
|
2010
|
30285
|
|
100
|
51007
|
9506
|
2010
|
60513
|
+------+---------+---------+------+---------+
46
rows
in
set
(
0
.00
sec
)
Zusammenfassung
Fassen wir einige wichtige Punkte des Kapitels zusammen:
-
Spark ist eine schnelle und leistungsstarke Unified-Analytics-Engine (bis zu hundertmal schneller als herkömmliches Hadoop MapReduce) und bietet robuste, verteilte, fehlertolerante Datenabstraktionen (sogenannte RDDs und DataFrames). Spark lässt sich über die Pakete MLlib (Bibliothek für maschinelles Lernen) und GraphX (Graphenbibliothek) in die Welt des maschinellen Lernens und der Graphikanalyse integrieren.
-
Du kannst die Transformationen und Aktionen von Spark in vier Programmiersprachen nutzen: Java, Scala, R und Python. Mit PySpark (der Python-API für Spark) kannst du Big-Data-Probleme lösen und deine Daten effizient in das gewünschte Ergebnis und Format transformieren.
-
Big Data kann mit den Datenabstraktionen von Spark dargestellt werden (RDDs, Datenrahmen und Datensätze - alle diese sind verteilte Datensätze).
-
Du kannst PySpark über die PySpark-Shell (mit dem Befehl
pyspark
von einer Kommandozeile aus) für die interaktive Spark-Programmierung ausführen. Mit der PySpark-Shell kannst du RDDs und Datenrahmen erstellen und manipulieren. -
Du kannst eine eigenständige PySpark-Anwendung mit dem Befehl
spark-submit
an einen Spark-Cluster übermitteln; eigenständige Anwendungen mit PySpark werden in Produktionsumgebungen eingesetzt. -
Spark bietet viele Transformationen und Aktionen zur Lösung von Big-Data-Problemen, die sich in ihrer Leistung unterscheiden (z. B.
reduceByKey()
gegenübergroupByKey()
undcombineByKey()
gegenübergroupByKey()
).
Das nächste Kapitel befasst sich mit einigen wichtigen Spark-Transformationen.
Get Datenalgorithmen mit Spark now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.