Kapitel 1. Einführung in Spark und PySpark

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

Spark ist eine leistungsstarke Analyse-Engine für die Verarbeitung großer Datenmengen, die auf Geschwindigkeit, Benutzerfreundlichkeit und Erweiterbarkeit für Big-Data-Anwendungen ausgerichtet ist. Es ist eine bewährte und weit verbreitete Technologie, die von vielen Unternehmen genutzt wird, die tagtäglich mit großen Daten arbeiten. Obwohl die "Muttersprache" von Spark Scala ist (der größte Teil von Spark wird in Scala entwickelt), bietet es auch High-Level-APIs in Java, Python und R.

In diesem Buch werden wir Python über PySpark verwenden, eine API, die das Spark-Programmiermodell für Python verfügbar macht. Da Python die am leichtesten zugängliche Programmiersprache ist und Spark über eine leistungsstarke und ausdrucksstarke API verfügt, ist PySpark aufgrund seiner Einfachheit die beste Wahl für uns. PySpark ist eine Schnittstelle für Spark in der Programmiersprache Python, die die folgenden zwei wichtigen Funktionen bietet:

  • Es ermöglicht uns, Spark-Anwendungen mit Python-APIs zu schreiben.

  • Es bietet die PySpark-Shell für die interaktive Analyse von Daten in einer verteiltenUmgebung.

Ziel dieses Kapitels ist es, PySpark als Hauptkomponente des Spark-Ökosystems vorzustellen und dir zu zeigen, dass es effektiv für Big-Data-Aufgaben wie ETL-Operationen, Indizierung von Milliarden von Dokumenten, Ingesting von Millionen von Genomen, maschinelles Lernen, Graphdatenanalyse, DNA-Datenanalyse und vieles mehr eingesetzt werden kann. Zunächst werde ich die Architekturen von Spark und PySpark vorstellen und anhand von Beispielen die Ausdrucksstärke von PySpark zeigen. Ich werde einen Überblick über die Kernfunktionen (Transformationen und Aktionen) und Konzepte von Spark geben, damit du in der Lage bist, Spark und PySpark sofort zu nutzen. Die wichtigsten Datenabstraktionen von Spark sind belastbare verteilte Datensätze (RDDs), Datenrahmen und Datensätze. Wie du sehen wirst, kannst du deine Daten (die als Hadoop-Dateien, Amazon S3-Objekte, Linux-Dateien, Datenstrukturen von Sammlungen, relationale Datenbanktabellen und mehr gespeichert sind) in beliebigen Kombinationen von RDDs undDataFrames darstellen.

Sobald deine Daten als Spark-Datenabstraktion dargestellt sind, kannst du Transformationen darauf anwenden und neue Datenabstraktionen erstellen, bis die Daten die endgültige Form haben, nach der du suchst. Mit den Spark-Transformationen (wie map() und reduceByKey()) kannst du deine Daten von einer Form in eine andere umwandeln, bis du das gewünschte Ergebnis erhältst. Ich werde diese Datenabstraktionen in Kürze erläutern, aber zuerst wollen wir ein wenig näher darauf eingehen, warum Spark die beste Wahl für die Datenanalyse ist.

Warum Spark für Datenanalyse

Spark ist eine leistungsstarke Analyse-Engine, die für die Verarbeitung großer Datenmengen verwendet werden kann. Die wichtigsten Gründe für den Einsatz von Spark sind:

  • Spark ist einfach, leistungsstark und schnell.

  • Spark ist kostenlos und quelloffen.

  • Spark läuft überall (Hadoop, Mesos, Kubernetes, eigenständig oder in der Cloud).

  • Spark kann Daten von/zu jeder Datenquelle lesen/schreiben (Amazon S3, Hadoop HDFS, relationale Datenbanken usw.).

  • Spark kann in fast jede Datenanwendung integriert werden.

  • Spark kann Daten in zeilenbasierten (wie Avro) und spaltenbasierten (wie Parquet und ORC) Formaten lesen/schreiben.

  • Spark verfügt über eine Reihe von einfachen APIs für alle Arten von ETL-Prozessen.

In den letzten fünf Jahren hat sich Spark so weiterentwickelt, dass ich glaube, dass es zur Lösung jedes Big-Data-Problems eingesetzt werden kann. Dafür spricht auch die Tatsache, dass alle Big-Data-Unternehmen wie Facebook, Illumina, IBM und Google Spark tagtäglich in Produktionssystemen einsetzen.

Spark ist eine der besten Entscheidungen für die Verarbeitung großer Datenmengen und für die Lösung vonMapReduce-Problemenund darüber hinaus, denn es erschließt die Macht der Daten, indem es große Daten mit leistungsstarken APIs und Geschwindigkeit verarbeitet. Die Verwendung von MapReduce/Hadoop zur Lösung von Big-Data-Problemen ist komplex, und du musst eine Menge Low-Level-Code schreiben, um selbst primitive Probleme zu lösen - hier kommt die Leistung und Einfachheit von Spark ins Spiel.Apache Sparkist wesentlich schneller als ApacheHadoop, da es In-Memory-Caching und optimierteAusführung für eine schnelle Leistung nutzt und allgemeine Batch-Verarbeitung, Streaming-Analysen, maschinelles Lernen, Graphenalgorithmen undSQL-Abfragen unterstützt.

Für PySpark gibt es in Spark zwei grundlegende Datenabstraktionen: RDD und DataFrame. Ich zeige dir, wie du deine Daten einliest und als RDD (eine Menge von Elementen desselben Typs) oder DataFrame (eine Tabelle mit Zeilen und benannten Spalten) darstellst; so kannst du einer verteilten Datensammlung eine Struktur auferlegen, die eine Abstraktion auf höherer Ebene ermöglicht. Sobald deine Daten als RDD oder DataFrame dargestellt sind, kannst du Transformationsfunktionen (wie Mapper, Filter und Reducer) darauf anwenden, um deine Daten in die gewünschte Form zu bringen. Ich werde dir viele Spark-Transformationen vorstellen, die du für ETL-Prozesse, Analysen und datenintensive Berechnungen nutzen kannst.

Einige einfache RDD-Transformationen sind in Abbildung 1-1 dargestellt.

daws 0101
Abbildung 1-1. Einfache RDD-Transformationen

Diese Abbildung zeigt die folgenden Transformationen:

  1. Zuerst lesen wir unsere Eingabedaten (dargestellt als Textdatei sample.txt - hierzeige ich nur die ersten beiden Zeilen/Datensätze der Eingabedaten) mit einer Instanz von SparkSession, dem Einstiegspunkt in die Programmierung von Spark. Die Instanz SparkSession wird als spark Objekt dargestellt. Beim Lesen der Eingabedaten wird ein neues RDD als RDD[String] erstellt: Jeder Eingabedatensatz wird in ein RDD-Element des Typs String umgewandelt (wenn dein Eingabepfad aus N Datensätze hat, dann ist die Anzahl der RDD-Elemente N). Dies wird durch den folgenden Code erreicht:

    # Create an instance of SparkSession
    spark = SparkSession.builder.getOrCreate()
    # Create an RDD[String], which represents all input
    # records; each record becomes an RDD element
    records = spark.sparkContext.textFile("sample.txt")
  2. Als Nächstes wandeln wir alle Zeichen in Kleinbuchstaben um. Dies geschieht durch die map() Transformation, die eine 1-zu-1-Transformation ist:

    # Convert each element of the RDD to lowercase
    # x denotes a single element of the RDD
    # records: source RDD[String]
    # records_lowercase: target RDD[String]
    records_lowercase = records.map(lambda x: x.lower())
  3. Dann verwenden wir eine flatMap() Transformation, die eine 1-zu-viele-Transformation ist, um jedes Element (das einen einzelnen Datensatz darstellt) in eine Folge von Zielelementen (die jeweils ein Wort darstellen) umzuwandeln.Die flatMap() Transformation gibt ein neues RDD zurück, indem sie zuerst eine Funktion (hier split(",")) auf alle Elemente des Quell-RDD anwendet und dann die Ergebnisse abflacht:

    # Split each record into a list of words
    # records_lowercase: source RDD[String]
    # words: target RDD[String]
    words = records_lowercase.flatMap(lambda x: x.split(","))
  4. Schließlich lassen wir Wortelemente mit einer Länge kleiner oder gleich 2 weg. Die folgende filter() Transformation lässt unerwünschte Wörter weg und behält nur die mit einer Länge größer als 2:

    # Keep words with a length greater than 2
    # x denotes a word
    # words: source RDD[String]
    # filtered: target RDD[String]
    filtered = words.filter(lambda x: len(x) > 2)

Wie du sehen kannst, sind Spark-Transformationen auf hohem Niveau, leistungsstark und einfach. Spark ist von Natur aus verteilt und parallel: Deine Eingabedaten sind partitioniert und können von Transformationen (wie Mappern, Filtern und Reduktoren) parallel in einer Cluster-Umgebung verarbeitet werden. Kurz gesagt: Um ein Datenanalyseproblem in PySpark zu lösen, liest du Daten und stellst sie als RDD oder DataFrame dar (je nach Art des Datenformats). Dann schreibst du eine Reihe von Transformationen, um deine Daten in die gewünschte Ausgabe umzuwandeln. Spark partitioniert deine DataFrames und RDDs automatisch und verteilt die Partitionen auf verschiedene Clusterknoten. Partitionen sind die Grundeinheiten der Parallelität in Spark. Die Parallelität ermöglicht es Entwicklern, Aufgaben auf Hunderten von Computerservern in einem Cluster parallel und unabhängig voneinander auszuführen. Eine Partition in Spark ist ein Chunk (eine logische Unterteilung) von Daten, die auf einem Knoten im Cluster gespeichert sind. Datenrahmen (DataFrames) und RDDs sind Sammlungen von Partitionen. Spark verfügt über einen Standard-Datenpartitionierer für RDDs und DataFrames, aber du kannst diese Partitionierung mit deiner eigenenProgrammierung außer Kraft setzen.

Als Nächstes wollen wir ein wenig tiefer in das Ökosystem und die Architektur von Spark eintauchen.

Das Spark-Ökosystem

Das Ökosystem von Spark ist inAbbildung 1-2 dargestellt. Es besteht aus drei Hauptkomponenten:

Umgebungen

Spark kann überall laufen und lässt sich gut in andere Umgebungen integrieren.

Anwendungen

Spark lässt sich gut mit einer Vielzahl von Big-Data-Plattformen und -Anwendungen integrieren.

Datenquellen

Spark kann Daten von und in viele Datenquellen lesen und schreiben.

daws 0102
Abbildung 1-2. Das Spark-Ökosystem (Quelle: Databricks)

Das umfangreiche Ökosystem von Spark macht PySpark zu einem großartigen Werkzeug für ETL, Datenanalyse und viele andere Aufgaben. Mit PySpark kannst du Daten aus vielen verschiedenen Datenquellen (dem Linux-Dateisystem, Amazon S3, dem Hadoop Distributed File System, relationalen Tabellen, MongoDB, Elasticsearch, Parquet-Dateien usw.) einlesen und als Spark-Datenabstraktion, wie z. B. RDDs oder DataFrames, darstellen. Sobald deine Daten in dieser Form vorliegen, kannst du eine Reihe von einfachen und leistungsstarken Spark-Transformationen verwenden, um die Daten in die gewünschte Form und das gewünschte Format zu bringen. Du kannst zum Beispiel die Transformation filter() verwenden, um unerwünschte Datensätze zu entfernen, groupByKey(), um deine Daten nach dem gewünschten Schlüssel zu gruppieren, und schließlich die Transformation mapValues(), um die gruppierten Daten abschließend zu aggregieren (z. B. um den Durchschnitt, den Median und die Standardabweichung von Zahlen zu ermitteln). Alle diese Transformationen sind mit der einfachen, aber leistungsstarken PySpark-API möglich.

Spark Architektur

Wenn du kleine Datenmengen hast, ist es möglich, sie mit einem einzigen Computer in einer angemessenen Zeit zu analysieren. Wenn du große Datenmengen hast, kann die Analyse und Verarbeitung (und Speicherung) dieser Daten mit einem einzigen Computer unerschwinglich langsam oder sogar unmöglich sein. Aus diesem Grund wollen wir Spark verwenden.

Spark verfügt über eine Kernbibliothek und eine Reihe integrierter Bibliotheken (SQL, GraphX, Streaming, MLlib), wie in Abbildung 1-3 dargestellt. Wie du siehst, kann Spark über seine DataSource-API mit vielen Datenquellen interagieren, z. B. mit Hadoop, HBase, Amazon S3, Elasticsearch und MySQL, um nur einige zu nennen.

daws 0103
Abbildung 1-3. Spark-Bibliotheken

Diese Abbildung zeigt die wahre Stärke von Spark: Du kannst mehrere verschiedene Sprachen verwenden, um deine Spark-Anwendungen zu schreiben, und dann umfangreiche Bibliotheken nutzen, um verschiedene Big-Data-Probleme zu lösen. Gleichzeitig kannst du Daten aus einer Vielzahl von Datenquellen lesen und schreiben.

Schlüsselbegriffe

Um die Architektur von Spark zu verstehen, musst du ein paar Schlüsselbegriffe kennen:

SparkSession

Die Klasse SparkSession, die im Paket pyspark.sql definiert ist, ist der Einstiegspunkt für die Programmierung von Spark mit den Dataset- und DataFrame-APIs. Um etwas Nützliches mit einem Spark-Cluster zu tun, musst du zunächst eine Instanz dieser Klasse erstellen, die dir Zugriff auf eine Instanz von SparkContext gibt.

Hinweis

PySparkverfügt über eine umfassende API (bestehend aus Paketen, Modulen, Klassen und Methoden) für den Zugriff auf die Spark-API. Es ist wichtig zu wissen, dass alle Spark-APIs, Pakete, Module, Klassen und Methoden, die in diesem Buch behandelt werden, PySpark-spezifisch sind. Wenn ich zum Beispiel von der Klasse SparkContext spreche, meine ich die Python-Klasse pyspark.SparkContext, die im Paket pyspark definiert ist, und wenn ich von der Klasse SparkSession spreche, meine ich die Python-Klasse pyspark.sql.SparkSession, die im Modul pyspark.sql definiert ist.

SparkContext

Die Klasse SparkContext, die im Paket pyspark definiert ist, ist der Haupteinstiegspunkt für Spark-Funktionen. Ein SparkContextstellt eine Verbindung zum Spark-Cluster-Manager her und kann verwendet werden, um RDDs und Broadcast-Variablen im Cluster zu erstellen. Wenn du eine Instanz von SparkSession erstellst, wird die SparkContext innerhalb deiner Sitzung als Attribut SparkSession.sparkContext verfügbar.

Treiber

Alle Spark-Anwendungen (einschließlich der PySpark-Shell und eigenständiger Python-Programme) laufen als unabhängige Prozesse. Diese Prozesse werden von einem SparkContext in einem Treiberprogramm koordiniert. Um ein eigenständiges Python-Programm an Spark zu übergeben, musst du ein Treiberprogramm mit der PySpark-API (oder Java oder Scala) schreiben. Dieses Programm ist dafür zuständig, die main() Funktion der Anwendung auszuführen und die SparkContext zu erstellen. Es kann auch zum Erstellen von RDDs und Datenrahmen verwendet werden.

Arbeiter/in

In einer Spark-Cluster-Umgebung gibt es zwei Arten von Knoten: einen (oder zwei, für hohe Verfügbarkeit) Master und eine Reihe von Workern. Ein Worker ist ein beliebiger Knoten, der Programme im Cluster ausführen kann. Wenn ein Prozess für eine Anwendung gestartet wird, erwirbt diese Anwendung Executors auf Worker-Knoten, die für die Ausführung von Spark-Aufgaben zuständig sind.

Cluster Manager

Der "Master"-Knoten wird als Clustermanager bezeichnet. Die Hauptfunktion dieses Knotens ist die Verwaltung der Clusterumgebung und der Server, die Spark zur Ausführung von Aufgaben nutzt. Der Clustermanager weist den einzelnen Anwendungen Ressourcen zu. Spark unterstützt fünf Arten von Clustermanagern, je nachdem, wo es ausgeführt wird:

  1. Standalone (Spark's eigene integrierte Cluster-Umgebung)

  2. Mesos (ein Kernel für verteilte Systeme)

  3. Hadoop YARN

  4. Kubernetes

  5. Amazon EC2

Hinweis

Die Master/Worker-Terminologie ist zwar veraltet und wird in vielen Softwarekontexten nicht mehr verwendet, aber sie ist immer noch Teil der Funktionalität von Apache Spark, weshalb ich diese Terminologie in diesem Buch verwende.

Die Architektur von Spark in aller Kürze

Eine Übersicht über die Architektur von Spark ist in Abbildung 1-4 dargestellt. Informell besteht ein Spark-Cluster aus einem Master-Knoten (dem "Clustermanager"), der für die Verwaltung der Spark-Anwendungen zuständig ist, und einer Reihe von "Worker"-Knoten (Ausführungsknoten), die für die Ausführung der Aufgaben zuständig sind, die von den Spark-Anwendungen (deinen Anwendungen, die du auf dem Spark-Cluster ausführen möchtest) eingereicht wurden.

daws 0104
Abbildung 1-4. Spark-Architektur

Je nachdem, in welcher Umgebung Spark läuft, ist der Clustermanager, der diesen Servercluster verwaltet, entweder der eigenständige Clustermanager von Spark, Kubernetes, Hadoop YARN oder Mesos. Wenn der Spark-Cluster läuft, kannst du Spark-Anwendungen an den Clustermanager übermitteln, der deiner Anwendung Ressourcen zuweist, damit du deine Datenanalyse durchführen kannst.

Dein Cluster kann aus einem, zehn, hunderten oder sogar tausenden von Worker-Knoten bestehen, je nach den Bedürfnissen deines Unternehmens und deinen Projektanforderungen. Du kannst Spark auf einem eigenständigen Server wie einem MacBook, Linux- oder Windows-PC ausführen, aberin der Regel wird Spark in Produktionsumgebungen auf einem Cluster von Linux-Servern betrieben. Um ein Spark-Programm auszuführen, brauchst du Zugang zu einem Spark-Cluster und ein Treiberprogramm, das die Transformationen und Aktionen auf RDDs von Daten deklariert und diese Anfragen an den Clustermanager weiterleitet. In diesem Buch werden alle Treiberprogramme in PySpark geschrieben.

Wenn Sie eine PySpark-Shell starten (durch Ausführen von<spark-installed-dir>/bin/pyspark), bekommst du automatisch zwei Variablen/Objekte definiert:

spark

Eine Instanz von SparkSession, die sich ideal zum Erstellen von Datenrahmen eignet

sc

Eine Instanz von SparkContext, die ideal für die Erstellung von RDDs ist

Wenn du eine eigenständige PySpark-Anwendung schreibst (einen Python-Treiber, der die PySpark-API verwendet), musst du explizit selbst eine Instanz vonSparkSession erstellen. Eine SparkSessionkann dazu verwendet werden:

  • Datenrahmen erstellen

  • Datenrahmen als Tabellen registrieren

  • SQL über Tabellen und Cache-Tabellen ausführen

  • Lesen/Schreiben von Text, CSV, JSON, Parquet und anderen Dateiformaten

  • Lesen/Schreiben von relationalen Datenbanktabellen

PySpark definiert SparkSession als:

pyspark.sql.SparkSession (Python class, in pyspark.sql module)
class pyspark.sql.SparkSession(sparkContext,jsparkSession=None)
SparkSession: the entry point to programming Spark with the RDD
and DataFrame API.

Um eine SparkSession in Python zu erstellen, verwende das hier gezeigte Builder-Muster:

# import required Spark class
from pyspark.sql import SparkSession 1

# create an instance of SparkSession as spark
spark = SparkSession.builder \ 2
  .master("local") \
  .appName("my-application-name") \
  .config("spark.some.config.option", "some-value") \ 3
  .getOrCreate() 4

# to debug the SparkSession
print(spark.version) 5

# create a reference to SparkContext as sc
# SparkContext is used to create new RDDs
sc = spark.sparkContext 6

# to debug the SparkContext
print(sc)
1

Importiert die Klasse SparkSession aus dem Modul pyspark.sql.

2

Ermöglicht den Zugriff auf die Builder-API, um SparkSession Instanzen zu erstellen.

3

Setzt eine config Option. Optionen, die mit dieser Methode gesetzt werden, werden automatisch sowohl an SparkConf als auch an die eigene Konfiguration vonSparkSessionweitergegeben. Bei der Erstellung eines SparkSession Objekts kannst du eine beliebige Anzahl vonconfig(<key>, <value>) Optionen festlegen.

4

Ruft eine bestehende SparkSession ab oder erstellt eine neue, wenn es keine gibt, basierend auf den hier eingestellten Optionen.

5

Nur für Debugging-Zwecke.

6

Ein SparkContext kann von einer Instanz von SparkSession aus referenziert werden.

PySpark definiert SparkContext als:

class pyspark.SparkContext(master=None, appName=None, ...)
SparkContext: the main entry point for Spark functionality.
A SparkContext represents the connection to a Spark cluster,
and can be used to create RDD (the main data abstraction for
Spark) and broadcast variables (such as collections and data
structures) on that cluster.

SparkContext ist der Haupteinstiegspunkt für Spark-Funktionen. Eine Shell (z. B. die PySpark-Shell) oder ein PySpark-Treiberprogramm kann nicht mehr als eine Instanz von erstellen. stellt die Verbindung zu einem Spark-Cluster dar und kann verwendet werden, um neue RDDs und Broadcast-Variablen (gemeinsame Datenstrukturen und Sammlungen - eine Art schreibgeschützte globale Variablen) auf diesem Cluster zu erstellen. SparkContext SparkContextAbbildung 1-5 zeigt, wie ein verwendet werden kann, um ein neues RDD aus einer Eingabetextdatei ( ) zu erstellen und es dann mit der Transformation in ein anderes RDD ( ) umzuwandeln. Wie du sehen kannst, SparkContext records_rdd flatMap() words_rddRDD.flatMap(f) ein neues RDD zurück, indem es zunächst eine Funktion (f) auf alle Elemente des Quell-RDDs anwendet und dann die Ergebnisse reduziert.

daws 0105
Abbildung 1-5. Erstellung von RDDs durch SparkContext

Um die Objekte SparkSession und SparkContextzu erstellen, verwendest du das folgende Muster:

    # create an instance of SparkSession
    spark_session = SparkSession.builder.getOrCreate()

    # use the SparkSession to access the SparkContext
    spark_context = spark_session.sparkContext

Wenn du nur mit RDDs arbeiten willst, kannst du eine Instanz vonSparkContext wie folgt erstellen:

    from pyspark import SparkContext
    spark_context = SparkContext("local", "myapp");

Nachdem du nun die Grundlagen von Spark kennst, wollen wir ein wenig tiefer in PySpark eintauchen.

Die Macht von PySpark

PySpark ist eine Python-API für Apache Spark, die die Zusammenarbeit zwischen Spark und der Programmiersprache Python unterstützen soll. Die meisten Datenwissenschaftler/innen kennen bereits Python, und PySpark macht es ihnen leicht, kurzen, prägnanten Code für verteilte Berechnungen mit Spark zu schreiben. PySpark ist ein All-in-One-Ökosystem, das mit seiner Unterstützung für RDDs, DataFrames, GraphFrames, MLlib, SQL und mehr komplexe Datenanforderungen bewältigen kann.

Ich werde dir die erstaunliche Leistung von PySpark anhand eines einfachen Beispiels zeigen. Angenommen, wir haben viele Datensätze mit Daten über URL-Besuche von Nutzern (die von einer Suchmaschine von vielen Webservern gesammelt wurden) im folgenden Format:

<url_address><,><frequency>

Hier sind ein paar Beispiele dafür, wie diese Aufzeichnungen aussehen:

http://mapreduce4hackers.com,19779
http://mapreduce4hackers.com,31230
http://mapreduce4hackers.com,15708
...
https://www.illumina.com,87000
https://www.illumina.com,58086
...

Nehmen wir an, wir wollen den Durchschnitt, den Median und die Standardabweichung der Besuchszahlen pro Schlüssel (d.h. url_address) ermitteln. Eine weitere Anforderung ist, dass wir alle Datensätze mit einer Länge von weniger als 5 verwerfen wollen (da es sich dabei um fehlerhafte URLs handeln könnte). Dafür lässt sich in PySpark leicht eine elegante Lösung finden, wie Abbildung 1-6 zeigt.

daws 0106
Abbildung 1-6. Einfacher Arbeitsablauf zur Berechnung von Mittelwert, Median und Standardabweichung

Erstellen wir zunächst einige grundlegende Python-Funktionen, die uns bei der Lösung unseres einfachen Problems helfen. Die erste Funktion, create_pair(), akzeptiert einen einzelnen Datensatz der Form <url_address><,><frequency> und gibt ein (Schlüssel, Wert)-Paar zurück (was uns später eineGROUP BY für das Schlüsselfeld ermöglicht), wobei der Schlüssel ein url_address und der Wert das zugehörige frequency ist:

 # Create a pair of (url_address, frequency)
 # where url_address is a key and frequency is a value
 # record denotes a single element of RDD[String]
 # record: <url_address><,><frequency>
 def create_pair(record): 1
     tokens = record.split(',') 2
     url_address = tokens[0]
     frequency = tokens[1]
     return (url_address, frequency) 3
 #end-def
1

Nimm einen Eintrag in der Form <url_address><,><frequency> an.

2

Tokenisiere den Eingabedatensatz, indem du den url_address als Schlüssel (tokens[0]) und den frequency als Wert (tokens[1]) verwendest.

3

Gib ein Paar von (url_address, frequency) zurück.

Die nächste Funktion, compute_stats(), akzeptiert eine Liste von Häufigkeiten (als Zahlen) und berechnet drei Werte, den Durchschnitt, den Median und die Standardabweichung:

 # Compute average, median, and standard
 # deviation for a given set of numbers
 import statistics 1
 # frequencies = [number1, number2, ...]
 def compute_stats(frequencies): 2
 	average = statistics.mean(frequencies) 3
 	median = statistics.median(frequencies) 4
 	standard_deviation = statistics.stdev(frequencies) 5
 	return (average, median, standard_deviation) 6
 #end-def
1

Dieses Modul bietet Funktionen zur Berechnung der mathematischen Statistik von numerischen Daten.

2

Akzeptiere eine Liste von Frequenzen.

3

Berechne den Durchschnitt der Frequenzen.

4

Berechne den Median der Häufigkeiten.

5

Berechne die Standardabweichung der Häufigkeiten.

6

Gib das Ergebnis als Tripel zurück.

Als Nächstes zeige ich dir die erstaunliche Leistung von PySpark in nur wenigen Codezeilen, indem ich Spark-Transformationen und unsere eigenen Python-Funktionen verwende:

# input_path = "s3://<bucket>/key"
input_path = "/tmp/myinput.txt"
results = spark 1
        .sparkContext 2
        .textFile(input_path) 3
        .filter(lambda record: len(record) > 5) 4
        .map(create_pair) 5
        .groupByKey() 6
        .mapValues(compute_stats) 7
1

spark bezeichnet eine Instanz von SparkSession, dem Einstiegspunkt in die Programmierung von Spark.

2

sparkContext (ein Attribut von SparkSession) ist der Haupteinstiegspunkt für Spark-Funktionen.

3

Daten als verteilte Menge von StringDatensätzen lesen (erstellt eine RDD[String]).

4

Verwerfe Datensätze mit einer Länge kleiner oder gleich 5 (behalte Datensätze mit einer Länge größer als 5).

5

Erstelle (url_address, frequency) Paare aus den Eingabedatensätzen.

6

Gruppiere die Daten nach Schlüsseln - jeder Schlüssel ( url_address) wird mit einer Liste von Frequenzen verknüpft.

7

Wende die Funktion compute_stats() auf die Liste der Frequenzen an.

Das Ergebnis ist eine Reihe von (Schlüssel, Wert)-Paaren der Form:

(url_address, (average, median, standard_deviation))

wobei url-address ein Schlüssel und(average, median, standard_deviation)ein Wert ist.

Hinweis

Das Wichtigste an Spark ist, dass es die Gleichzeitigkeit von Funktionen und Operationen durch die Partitionierung von Daten maximiert. Nimm ein Beispiel:

Wenn deine Eingabedaten 600 Milliarden Zeilen haben und du einen Cluster mit 10 Knoten verwendest, werden deine Eingabedaten aufgeteilt in N ( > 1) Chunks aufgeteilt, die unabhängig und parallel verarbeitet werden. Wenn N=20,000 (die Anzahl der Chunks oder Partitionen), dann hat jeder Chunk etwa 30 Millionen Datensätze/Elemente (600.000.000.000 / 20.000 = 30.000.000). Wenn du einen großen Cluster hast, können alle 20.000 Chunks auf einmal verarbeitet werden. Wenn du einen kleineren Cluster hast, kann es sein, dass nur alle 100 Chunks unabhängig und parallel verarbeitet werden können. Dieser Prozess wird fortgesetzt, bis alle 20.000 Chunks verarbeitet sind.

PySpark Architektur

PySpark baut auf der Java-API von Spark auf. Die Daten werden in Python verarbeitet und in der Java Virtual Machine (JVM) zwischengespeichert bzw. geshuffelt (das Konzept des Shuffelns werde ich in Kapitel 2 behandeln). Eine Übersicht über die Architektur von PySpark ist in Abbildung 1-7 dargestellt.

daws 0107
Abbildung 1-7. PySpark-Architektur

Und der Datenfluss von PySpark ist in Abbildung 1-8 dargestellt.

daws 0108
Abbildung 1-8. PySpark-Datenfluss

Im Python-Treiberprogramm (deine Spark-Anwendung in Python) verwendet SparkContextPy4J, um eine JVM zu starten und eineJavaSparkContext zu erstellen. Py4J wird im Treiber nur für die lokale Kommunikation zwischen den Python- und Java-Objekten SparkContext verwendet; große Datenübertragungen werden über einen anderen Mechanismus durchgeführt. RDD-Transformationen in Python werden aufPythonRDD Objekte in Java abgebildet. Auf entfernten Worker-Rechnern starten PythonRDD Objekte Python-Unterprozesse und kommunizieren mit diesen über Pipes, um den Code des Benutzers und die zu verarbeitenden Daten zu senden.

Hinweis

Mit Py4J können Python-Programme, die in einem Python-Interpreter laufen, dynamisch auf Java-Objekte in einer JVM zugreifen. Methoden werden so aufgerufen, als befänden sich die Java-Objekte im Python-Interpreter, und auf Java-Sammlungen kann mit Standard-Python-Sammlungsmethoden zugegriffen werden. Py4J ermöglicht es Java-Programmen auch, Python-Objekte zurückzurufen.

Spark-Datenabstraktionen

Um Daten in der Programmiersprache Python zu manipulieren, verwendest du Ganzzahlen, Strings, Listen und Wörterbücher. Um Daten in Spark zu manipulieren und zu analysieren, musst du sie als Spark-Datensatz darstellen. Spark unterstützt drei Arten von Dataset-Abstraktionen:

  • RDD (Resilient Distributed Dataset):

    • Low-Level-API

    • Bezeichnet durch RDD[T] (jedes Element hat den Typ T)

  • Datenrahmen (ähnlich wie bei relationalen Tabellen):

    • High-Level-API

    • Bezeichnet mit Table(column_name_1, column_name_2, ...)

  • Datensatz (ähnlich wie bei relationalen Tabellen):

    • High-Level-API (nicht in PySpark verfügbar)

Die Datenabstraktion Dataset wird in stark typisierten Sprachen wie Java verwendet und wird in PySpark nicht unterstützt. RDDs und Datenrahmen werden in den folgenden Kapiteln ausführlich behandelt, aber ich gebe hier eine kurze Einführung.

RDD Beispiele

Im Wesentlichen stellt ein RDD deine Daten als eine Sammlung von Elementen dar. Es ist eine unveränderliche Menge von verteilten Elementen des Typs T, die als RDD[T] bezeichnet wird.

Tabelle 1-1 zeigt Beispiele für drei einfache Arten von RDDs:

RDD[Integer]

Jedes Element ist ein Integer.

RDD[String]

Jedes Element ist eine String.

RDD[(String, Integer)]

Jedes Element ist ein Paar von (String, Integer).

Tabelle 1-1. Einfache RDDs
RDD[Integer] RDD[String] RDD[(String, Integer)]

2

"abc"

('A', 4)

-730

"fox is red"

('B', 7)

320

"Python is cool"

('ZZ', 9)

...

...

...

Tabelle 1-2 ist ein Beispiel für ein komplexes RDD. Jedes Element ist ein (Schlüssel, Wert)-Paar, wobei der Schlüssel eine String und der Wert ein Tripel von (Integer, Integer, Double) ist.

Tabelle 1-2. Komplexes RDD
RDD[(String, (Integer, Integer, Double))]

("cat", (20, 40, 1.8))

("cat", (30, 10, 3.9))

("lion king", (27, 32, 4.5))

("python is fun", (2, 3, 0.6))

...

Spark RDD-Operationen

Spark RDDs sind schreibgeschützt, unveränderlich und verteilt. Einmal erstellt, können sie nicht mehr verändert werden: Du kannst keine Datensätze hinzufügen, löschen oder aktualisieren. Sie können jedoch umgewandelt werden. RDDs unterstützen zwei Arten von Operationen: Transformationen, die das/die Quell-RDD(s) in ein oder mehrere neue RDDs umwandeln, und Aktionen, die das/die Quell-RDD(s) in ein Nicht-RDD-Objekt wie ein Wörterbuch oder Array umwandeln. Die Beziehung zwischen RDDs, Transformationen und Aktionen ist in Abbildung 1-9 dargestellt.

daws 0109
Abbildung 1-9. RDDs, Transformationen und Aktionen

Wir werden in den folgenden Kapiteln noch viel detaillierter auf die Spark-Transformationen eingehen, mit Arbeitsbeispielen, die dir helfen, sie zu verstehen, aber ich werde hier eine kurze Einführung geben.

Verwandlungen

Eine Transformation in Spark ist eine Funktion, die ein bestehendes RDD (das Quell-RDD) nimmt, eine Transformation darauf anwendet und ein neues RDD (das Ziel-RDD) erstellt. Beispiele hierfür sind: map(),flatMap(), groupByKey(), reduceByKey() undfilter().

Informell können wir eine Transformation wie folgt ausdrücken:

transformation: source_RDD[V] --> target_RDD[T] 1
1

Verwandle source_RDD vom Typ V intarget_RDD vom Typ T.

RDDs werden erst ausgewertet, wenn eine Aktion auf ihnen ausgeführt wird: Das bedeutet, dass Transformationen faul ausgewertet werden. Wenn ein RDD während einer Transformation fehlschlägt, baut die Datenlinie der Transformationen das RDD neu auf.

Die meisten Spark-Transformationen erstellen ein einzelnes RDD, aber es ist auch möglich, dass sie mehrere Ziel-RDDs erstellen. Die Ziel-RDD(s) können kleiner, größer oder gleich groß wie die Quell-RDD sein.

Das folgende Beispiel zeigt eine Abfolge von Umwandlungen:

tuples = [('A', 7), ('A', 8), ('A', -4),
          ('B', 3), ('B', 9), ('B', -1),
          ('C', 1), ('C', 5)]
rdd = spark.sparkContext.parallelize(tuples)

# drop negative values
positives = rdd.filter(lambda x: x[1] > 0)
positives.collect()
[('A', 7), ('A', 8), ('B', 3), ('B', 9), ('C', 1), ('C', 5)]

# find sum and average per key using groupByKey()
sum_and_avg = positives.groupByKey()
    .mapValues(lambda v: (sum(v), float(sum(v))/len(v)))

# find sum and average per key using reduceByKey()
# 1. create (sum, count) per key
sum_count = positives.mapValues(lambda v: (v, 1))
# 2. aggregate (sum, count) per key
sum_count_agg = sum_count.reduceByKey(lambda x, y:
     (x[0]+y[0], x[1]+y[1]))
# 3. finalize sum and average per key
sum_and_avg = sum_count_agg.mapValues(
    lambda v: (v[0], float(v[0])/v[1]))
Tipp

Die Transformation groupByKey() fasst die Werte für jeden Schlüssel im RDD in einer einzigen Sequenz zusammen, ähnlich wie eine SQL-Anweisung GROUP BY . Diese Transformation kann zu OOM-Fehlern (Out of Memory) führen, wenn die Daten über das Netzwerk von Spark-Servern gesendet und auf den Reducern/Workern gesammelt werden und die Anzahl der Werte pro Schlüssel in die Tausende oder Millionen geht.

Bei der Transformation reduceByKey()werden die Daten jedoch in jeder Partition kombiniert, so dass nur eine Ausgabe für jeden Schlüssel in jeder Partition über das Netzwerk der Spark-Server gesendet werden muss. Das macht sie skalierbarer als groupByKey(). reduceByKey() führt die Werte für jeden Schlüssel mithilfe einer assoziativen und kommutativen Reduktionsfunktion zusammen. Sie kombiniert alle Werte (pro Schlüssel) zu einem anderen Wert mit genau demselben Datentyp (dies ist eine Einschränkung, die durch die Verwendung dercombineByKey() Transformation überwunden werden kann). Insgesamt ist die reduceByKey() besser skalierbar als die groupByKey(). Wir werden in Kapitel 4 mehr über diese Themen sprechen.

Aktionen

Spark-Aktionen sind RDD-Operationen oder Funktionen, die Nicht-RDD-Werte erzeugen. Informell können wir eine Aktion wie folgt ausdrücken:

action: RDD => non-RDD value

Aktionen können die Auswertung von RDDs auslösen (die, wie du dich erinnern wirst, faul ausgewertet werden). Die Ausgabe einer Aktion ist jedoch ein konkreter Wert: eine gespeicherte Datei, ein Wert wie eine ganze Zahl, eine Anzahl von Elementen, eine Liste von Werten, ein Wörterbuch und so weiter.

Im Folgenden findest du Beispiele für Aktionen:

reduce()

Wendet eine Funktion an, um einen einzelnen Wert zu liefern, wie zum Beispiel das Addieren von Werten für eine bestimmte RDD[Integer]

collect()

Konvertiert eine RDD[T] in eine Liste vom Typ T

count()

Ermittelt die Anzahl der Elemente in einem gegebenen RDD

saveAsTextFile()

Speichert RDD-Elemente auf einer Festplatte

saveAsMap()

Speichert RDD[(K, V)] Elemente auf einer Festplatte alsdict[K, V]

DataFrame Beispiele

Ähnlich wie ein RDD ist ein DataFrame in Spark eine unveränderliche, verteilte Datensammlung. Im Gegensatz zu einem RDD sind die Daten jedoch in benannten Spalten organisiert, wie eine Tabelle in einer relationalen Datenbank. Dies soll die Verarbeitung großer Datensätze erleichtern. Mit Datenrahmen können Programmierer/innen einer verteilten Datensammlung eine Struktur auferlegen, die eine höhere Abstraktionsebene ermöglicht. Sie machen auch die Verarbeitung von CSV- und JSON-Dateien viel einfacher als mit RDDs.

Das folgende Beispiel eines Datenrahmens hat drei Spalten:

DataFrame[name, age, salary]
name: String, age: Integer, salary: Integer

+-----+----+---------+
| name| age|   salary|
+-----+----+---------+
|  bob|  33|    45000|
| jeff|  44|    78000|
| mary|  40|    67000|
|  ...| ...|      ...|
+-----+----+---------+

Ein DataFrame kann aus vielen verschiedenen Quellen erstellt werden, z. B. aus Hive-Tabellen, strukturierten Datendateien (SDF), externen Datenbanken oder bestehenden RDDs. Die DataFrames-API wurde für moderne Big-Data- und Data-Science-Anwendungen entwickelt und orientiert sich an den DataFrames in R und Pandas in Python. Wie wir in späteren Kapiteln sehen werden, können wir SQL-Abfragen gegen DataFrames ausführen.

Spark SQL verfügt über ein umfangreiches Set an leistungsstarken Datenrahmen-Operationen, darunter:

  • Aggregatfunktionen (Min, Max, Summe, Durchschnitt, etc.)

  • Funktionen der Sammlung

  • Mathe-Funktionen

  • Sortierfunktionen

  • String-Funktionen

  • Benutzerdefinierte Funktionen (UDFs)

Du kannst zum Beispiel ganz einfach eine CSV-Datei lesen und daraus einen Datenrahmen erstellen:

# define input path
virus_input_path = "s3://mybucket/projects/cases/case.csv"

# read CSV file and create a DataFrame
cases_dataframe = spark.read.load(virus_input_path,format="csv",
   sep=",", inferSchema="true", header="true")

# show the first 3 rows of created DataFrame
cases_dataframe.show(3)
+-------+-------+-----------+--------------+---------+
|case_id|country|       city|infection_case|confirmed|
+-------+-------+-----------+--------------+---------+
|  C0001|    USA|   New York|       contact|      175|
+-------+-------+-----------+--------------+---------+
|  C0008|    USA| New Jersey|       unknown|       25|
+-------+-------+-----------+--------------+---------+
|  C0009|    USA|  Cupertino|       contact|      100|
+-------+-------+-----------+--------------+---------+

Um die Ergebnisse nach der Anzahl der Fälle in absteigender Reihenfolge zu sortieren, können wir die Funktion sort() verwenden:

# We can do this using the F.desc function:
from pyspark.sql import functions as F
cases_dataframe.sort(F.desc("confirmed")).show()
+-------+-------+-----------+--------------+---------+
|case_id|country|       city|infection_case|confirmed|
+-------+-------+-----------+--------------+---------+
|  C0001|    USA|   New York|       contact|      175|
+-------+-------+-----------+--------------+---------+
|  C0009|    USA|  Cupertino|       contact|      100|
+-------+-------+-----------+--------------+---------+
|  C0008|    USA| New Jersey|       unknown|       25|
+-------+-------+-----------+--------------+---------+

Wir können auch ganz einfach Zeilen filtern:

cases_dataframe.filter((cases_dataframe.confirmed > 100) &
                       (cases_dataframe.country == 'USA')).show()

+-------+-------+-----------+--------------+---------+
|case_id|country|       city|infection_case|confirmed|
+-------+-------+-----------+--------------+---------+
|  C0001|    USA|   New York|       contact|      175|
+-------+-------+-----------+--------------+---------+
...

Um dir einen besseren Eindruck von der Leistungsfähigkeit der DataFrames von Spark zu geben, gehen wir ein Beispiel durch. Wir erstellen einen Datenrahmen und ermitteln den Durchschnitt und die Summe der Arbeitsstunden der Mitarbeiter pro Abteilung:

# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, sum

# Create a DataFrame using SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()
dept_emps = [("Sales", "Barb", 40), ("Sales", "Dan", 20),
             ("IT", "Alex", 22), ("IT", "Jane", 24),
             ("HR", "Alex", 20), ("HR", "Mary", 30)]
df = spark.createDataFrame(dept_emps, ["dept", "name", "hours"])

# Group the same depts together, aggregate their hours, and compute an average
averages = df.groupBy("dept")
   .agg(avg("hours").alias('average'),
        sum("hours").alias('total'))

# Show the results of the final execution
averages.show()
+-----+--------+------+
| dept| average| total|
+-----+--------+------+
|Sales|    30.0|  60.0|
|   IT|    23.0|  46.0|
|   HR|    25.0|  50.0|
+-----+--------+------+

Wie du siehst, sind die Datenrahmen von Spark leistungsfähig genug, um Milliarden von Zeilen mit einfachen, aber leistungsstarken Funktionen zu bearbeiten.

Verwendung der PySpark Shell

Es gibt zwei Möglichkeiten, wie du PySpark nutzen kannst:

  • Verwende die PySpark-Shell (zum Testen und interaktiven Programmieren).

  • PySpark in einer in sich geschlossenen Anwendung verwenden. In diesem Fall schreibst du ein Python-Treiberprogramm (z. B.my_pyspark_program.py) mit Hilfe der PySpark-API und führst es dann mit dem Befehl spark-submitaus:

    export SUBMIT=$SPARK_HOME/bin/spark-submit
    $SUBMIT [options] my_pyspark_program.py <parameters>

    wobei <parameters> eine Liste von Parametern ist, die von deinem PySpark-Programm(my_pyspark_program.py) verwendet werden.

Hinweis

Einzelheiten zur Verwendung des Befehls spark-submitfindest du unter"Einreichen von Anträgen" in der Spark-Dokumentation.

In diesem Abschnitt konzentrieren wir uns auf die interaktive Shell von Spark für Python-Benutzer, ein leistungsstarkes Tool, mit dem du Daten interaktiv analysieren und die Ergebnisse sofort sehen kannst (Spark bietet auch eine Scala-Shell). Die PySpark-Shell kann sowohl auf Einzelmaschinen- als auch auf Cluster-Installationen von Spark eingesetzt werden. Du startest die Shell mit folgendem Befehl, wobei SPARK_HOME das Installationsverzeichnis von Spark auf deinem System bezeichnet:

export SPARK_HOME=<spark-installation-directory>
$SPARK_HOME/bin/pyspark

Zum Beispiel:

export SPARK_HOME="/home/spark" 1
$SPARK_HOME/bin/pyspark 2
Python 3.7.2

Welcome to Spark version 3.1.2
Using Python version 3.7.2
SparkSession available as spark.
SparkContext available as sc
>>>
1

Lege das Spark-Installationsverzeichnis fest.

2

Rufe die PySpark-Shell auf.

Wenn du die Shell startest, zeigt PySpark einige nützliche Informationen an, darunter Details zu den verwendeten Python- und Spark-Versionen (beachte, dass die Ausgabe hier gekürzt wurde). Das Symbol >>> wird als Eingabeaufforderung für die PySpark-Shell verwendet. Diese Eingabeaufforderung zeigt an, dass du jetzt Python- oder PySpark-Befehle schreiben und die Ergebnisse ansehen kannst.

Damit du dich mit der PySpark-Shell vertraut machen kannst, führen wir dich in den folgenden Abschnitten durch einige grundlegende Anwendungsbeispiele.

Starten der PySpark Shell

Um in eine PySpark-Shell zu gelangen, führen wir pyspark wie folgt aus:

$SPARK_HOME/bin/pyspark  1
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

SparkSession available as 'spark'.
SparkContext available as 'sc'.
>>> sc.version 2
'3.1.2'
>>> spark.version 3
'3.1.2'
1

Wenn du pyspark ausführst, wird eine neue Shell erstellt. Die Ausgabe wurde hier gekürzt.

2

Vergewissere dich, dass SparkContext als sc angelegt ist.

3

Vergewissere dich, dass SparkSession als spark angelegt ist.

Sobald du die PySpark-Shell aufrufst, wird eine Instanz von SparkSession als Variable spark und eine Instanz von SparkContext als Variable sc erstellt. Wie du bereits in diesem Kapitel gelernt hast, ist SparkSession der Einstiegspunkt in die Programmierung von Spark mit den Dataset- und DataFrame-APIs. Mit SparkSession kannst du Datenrahmen erstellen, Datenrahmen als Tabellen registrieren, SQL über Tabellen ausführen, Tabellen zwischenspeichern und CSV-, JSON- und Parquet-Dateien lesen. Wenn du PySpark in einer eigenständigen Anwendung verwenden möchtest, musst du explizit ein SparkSession erstellen, indem du das in "Spark-Architektur in Kürze" gezeigte Builder-Muster verwendest . SparkContext ist der Haupteinstiegspunkt für Spark-Funktionen; mit ihm können RDDs aus Textdateien und Python-Sammlungen erstellt werden. Das werden wir uns als Nächstes ansehen.

Ein RDD aus einer Sammlung erstellen

Mit Spark können wir neue RDDs aus Dateien und Sammlungen (Datenstrukturen wie Arrays und Listen) erstellen. Hier verwenden wir SparkContext.parallelize(), um ein neues RDD aus einer Sammlung (dargestellt als data) zu erstellen:

>>> data = [ 1
    ("fox", 6), ("dog", 5), ("fox", 3), ("dog", 8),
    ("cat", 1), ("cat", 2), ("cat", 3), ("cat", 4)
]

>>># use SparkContext (sc) as given by the PySpark shell
>>># create an RDD as rdd
>>> rdd = sc.parallelize(data) 2
>>> rdd.collect() 3
[
 ('fox', 6), ('dog', 5), ('fox', 3), ('dog', 8),
 ('cat', 1), ('cat', 2), ('cat', 3), ('cat', 4)
]
>>> rdd.count() 4
8
1

Definiere deine Python-Sammlung.

2

Erstelle ein neues RDD aus einer Python-Sammlung.

3

Zeige den Inhalt des neuen RDDs an.

4

Zähle die Anzahl der Elemente im RDD.

Aggregieren und Zusammenführen von Werten von Schlüsseln

Die Transformation reduceByKey() wird zum Zusammenführen und Aggregieren von Werten verwendet. In diesem Beispiel beziehen sich x und yauf die Werte desselben Schlüssels:

>>> sum_per_key = rdd.reduceByKey(lambda x, y : x+y) 1
>>> sum_per_key.collect() 2
[
 ('fox', 9),
 ('dog', 13),
 ('cat', 10)
]
1

Werte desselben Schlüssels zusammenführen und aggregieren.

2

Sammle die Elemente des RDDs.

Das Quell-RDD für diese Transformation muss aus (Schlüssel, Wert)-Paaren bestehen. reduceByKey() führt die Werte für jeden Schlüssel mithilfe einer assoziativen und kommutativen Reduktionsfunktion zusammen. Diese führt die Zusammenführung auch lokal auf jedem Mapper durch, bevor sie die Ergebnisse an einen Reducer sendet, ähnlich wie ein "Combiner" in MapReduce. Die Ausgabewird mit numPartitionspartitioniert oder mit dem Standard-Parallelitätslevel, wenn numPartitions nicht angegeben ist. Der Standardpartitionierer ist HashPartitioner.

Wenn T der Typ des Wertes für (Schlüssel, Wert) Paare ist, dann kann reduceByKey()'s func() definiert werden als:

# source_rdd : RDD[(K, T)]
# target_rdd : RDD[(K, T)]
target_rdd = source_rdd.reduceByKey(lambda x, y: func(x, y))
# OR you may write it by passing the function name
# target_rdd = source_rdd.reduceByKey(func)
# where
#      func(T, T) -> T
# Then you may define `func()` in Python as:
# x: type of T
# y: type of T
def func(x, y):
  result = <aggregation of x and y: return a result of type T>
  return result
#end-def

Das bedeutet, dass:

  • Es gibt zwei Eingangsargumente (vom gleichen Typ, T) für den Reducer func().

  • Der Rückgabetyp von func() muss derselbe sein wie der Eingabetyp T (diese Einschränkung kann vermieden werden, wenn du die Transformation combineByKey() verwendest).

  • Der Verkleinerer func() muss assoziativ sein. Informell wird eine binäre Operation f() auf einer Menge T als assoziativ bezeichnet, wenn sie das Assoziationsgesetz erfüllt, das besagt, dass die Reihenfolge, in der die Zahlen gruppiert werden, das Ergebnis der Operation nicht verändert.

    Vereinigungsrecht

    f(f(x, y), z) = f(x, f(y, z))

    Beachte, dass das Assoziativgesetz für Addition (+) und Multiplikation (*) gilt, aber nicht für Subtraktion (-) oder Division (/).

  • Der Verkleinerer func() muss kommutativ sein: informell eine Funktion f() für die f(x, y) = f(y, x) für alle Werte von x und y. Das heißt, dass eine Änderung der Reihenfolge der Zahlen keinen Einfluss auf das Ergebnis der Operation haben darf.

    Kommutativgesetz

    f(x, y) = f(y, x)

    Das Kommutativgesetz gilt auch für Addition und Multiplikation, aber nicht für Subtraktion oder Division. Ein Beispiel:

    5 + 3 = 3 + 5 aber 5 - 3 ≠ 3 - 5

Deshalb darfst du keine Subtraktions- oder Divisionsoperationen in einer reduceByKey() Transformation verwenden.

Die Elemente eines RDDs filtern

Als Nächstes verwenden wir die Transformation filter(), um ein neues RDD zurückzugeben, das nur die Elemente enthält, die ein Prädikat erfüllen:

>>> sum_filtered = sum_per_key.filter(lambda x : x[1] > 9) 1
>>> sum_filtered.collect() 2
[
 ('cat', 10),
 ('dog', 13)
]
1

Behalte die (Schlüssel, Wert) Paare, wenn der Wert größer als 9 ist.

2

Sammle die Elemente des RDDs.

Ähnliche Schlüssel gruppieren

Wir können die Transformation groupByKey() verwenden, um die Werte für jeden Schlüssel im RDD in einer einzigen Sequenz zusammenzufassen:

>>> grouped = rdd.groupByKey() 1
>>> grouped.collect() 2
[
 ('fox', <ResultIterable object at 0x10f45c790>), 3
 ('dog', <ResultIterable object at 0x10f45c810>),
 ('cat', <ResultIterable object at 0x10f45cd90>)
]
>>>
>>># list(v) converts v as a ResultIterable into a list
>>> grouped.map(lambda (k,v) : (k, list(v))).collect()  4
[
 ('fox', [6, 3]),
 ('dog', [5, 8]),
 ('cat', [1, 2, 3, 4])
]
1

Gruppiere Elemente mit demselben Schlüssel zu einer Folge von Elementen.

2

Schau dir das Ergebnis an.

3

Der vollständige Name von ResultIterable ist pyspark.resultiterable.ResultIterable.

4

Wende zuerst map() und dann collect() an, die eine Liste zurückgeben, die alle Elemente des resultierenden RDDs enthält. Die Funktionlist() wandelt ResultIterable in eine Liste von Objekten um.

Das Quell-RDD für diese Transformation muss aus (Schlüssel, Wert)-Paaren bestehen. groupByKey() gruppiert die Werte für jeden Schlüssel im RDD zu einer einzigen Sequenz und teilt das resultierende RDD mit numPartitions Partitionen oder mit dem Standard-Parallelitätsgrad auf, wenn numPartitions nicht angegeben ist. Wenn du eine Gruppierung (mit der TransformationgroupByKey() ) vornimmst, um eine Aggregation (z. B. eine Summe oder einen Durchschnitt) über jeden Schlüssel durchzuführen, wird die Verwendung von reduceByKey()oder aggregateByKey() eine viel bessere Leistung bringen.

Aggregieren von Werten für ähnliche Schlüssel

Um die Werte für jeden Schlüssel zu aggregieren und zu summieren, können wir die Transformation mapValues() und die Funktion sum() verwenden:

>>> aggregated = grouped.mapValues(lambda values : sum(values)) 1
>>> aggregated.collect() 2
[
 ('fox', 9),
 ('dog', 13),
 ('cat', 10)
]
1

values ist eine Folge von Werten pro Schlüssel. Wir leiten jeden Wert im (Schlüssel, Wert) Paar RDD durch eine Mapper-Funktion (indem wir alle valuesmit sum(values) hinzufügen), ohne die Schlüssel zu ändern.

2

Zum Debuggen geben wir eine Liste zurück, die alle Elemente in diesem RDD enthält.

Wir haben mehrere Möglichkeiten, Werte zu aggregieren und zu summieren: reduceByKey() und groupByKey(), um nur einige zu nennen. Im Allgemeinen ist die TransformationreduceByKey() effizienter als die Transformation groupByKey(). Weitere Einzelheiten dazu findest du in Kapitel 4.

Wie du in den folgenden Kapiteln sehen wirst, verfügt Spark über viele weitere leistungsstarke Transformationen, die ein RDD in ein neues RDD umwandeln können. Wie bereits erwähnt, sind RDDs schreibgeschützt, unveränderlich und verteilt. RDD-Transformationen geben einen Zeiger auf ein neues RDD zurück und ermöglichen es dir, Abhängigkeiten zwischen RDDs zu erstellen. Jedes RDD in der Abhängigkeitskette (oder Kette von Abhängigkeiten) hat eine Funktion zur Berechnung seiner Daten und einen Zeiger (Abhängigkeit) auf sein Eltern-RDD.

Datenanalyse-Tools für PySpark

Jupyter

Jupyter ist ein großartiges Werkzeug, um Programme zu testen und Prototypen zu erstellen. PySpark kann auch in Jupyter-Notizbüchern verwendet werden; es ist sehr praktisch für die explorative Datenanalyse.

Apache Zeppelin

Zeppelin ist ein webbasiertes Notizbuch, das datengesteuerte, interaktive Datenanalysen und kollaborative Dokumente mit SQL, Python, Scala und mehr ermöglicht.

ETL-Beispiel mit Datenrahmen

In der Datenanalyse und -verarbeitung ist ETL das allgemeine Verfahren zum Kopieren von Daten aus einer oder mehreren Quellen in ein Zielsystem, das die Daten anders als die Quelle(n) oder in einem anderen Kontext als die Quelle(n) darstellt. Hier werde ich zeigen, wie Spark ETL möglich und einfach macht.

Für dieses ETL-Beispiel verwende ich die Daten der Volkszählung 2010 im JSON-Format(census_2010.json):

$ wc -l census_2010.json
101 census_2010.json

$ head -5 census_2010.json
{"females": 1994141, "males": 2085528, "age": 0, "year": 2010}
{"females": 1997991, "males": 2087350, "age": 1, "year": 2010}
{"females": 2000746, "males": 2088549, "age": 2, "year": 2010}
{"females": 2002756, "males": 2089465, "age": 3, "year": 2010}
{"females": 2004366, "males": 2090436, "age": 4, "year": 2010}
Hinweis

Diese Daten stammen vom U.S. Census Bureau, das zum Zeitpunkt der Erstellung dieses Buches nur die binären Optionen männlich und weiblich zur Verfügung stellt. Wir bemühen uns, so umfassend wie möglich zu sein, und hoffen, dass nationale Datensätze wie diese in Zukunft mehr Optionen bieten werden.

Definieren wir unseren ETL-Prozess:

Extraktion

Zuerst erstellen wir einen Datenrahmen aus einem gegebenen JSON-Dokument.

Transformation

Dann filtern wir die Daten und behalten die Datensätze für Senioren (age > 54). Als Nächstes fügen wir eine neue Spalte hinzu, total, die die Summe der männlichen und weiblichen Daten darstellt.

Laden

Schließlich schreiben wir den überarbeiteten Datenrahmen in eine MySQL-Datenbank und überprüfen den Ladevorgang.

Lass uns diesen Prozess ein bisschen genauer untersuchen.

Extraktion

Um eine richtige Extraktion durchzuführen, müssen wir zunächst eine Instanz der Klasse SparkSession erstellen:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("ETL") \
    .getOrCreate()

Als Nächstes lesen wir das JSON und erstellen einen Datenrahmen:

>>> input_path = "census_2010.json"
>>> census_df = spark.read.json(input_path)
>>> census_df.count()
101
>>> census_df.show(200)
+---+-------+-------+----+
|age|females|  males|year|
+---+-------+-------+----+
|  0|1994141|2085528|2010|
|  1|1997991|2087350|2010|
|  2|2000746|2088549|2010|
...
| 54|2221350|2121536|2010|
| 55|2167706|2059204|2010|
| 56|2106460|1989505|2010|
...
| 98|  35778|   8321|2010|
| 99|  25673|   4612|2010|
+---+-------+-------+----+
only showing top 100 rows

Transformation

Die Umwandlung kann viele Prozesse umfassen, deren Zweck es ist, die Daten zu bereinigen, zu formatieren oder Berechnungen durchzuführen, um sie an deine Anforderungen anzupassen. Du kannst zum Beispiel fehlende oder doppelte Daten entfernen, Spalten zusammenführen, um neue Spalten zu erstellen, oder bestimmte Zeilen oder Spalten herausfiltern. Sobald wir den Datenrahmen durch den Extraktionsprozess erstellt haben, können wir viele nützliche Transformationen durchführen, wie z. B. nur die Senioren auswählen:

>>> seniors = census_df[census_df['age'] > 54]
>>> seniors.count()
46
>>> seniors.show(200)
+---+-------+-------+----+
|age|females|  males|year|
+---+-------+-------+----+
| 55|2167706|2059204|2010|
| 56|2106460|1989505|2010|
| 57|2048896|1924113|2010|
...
| 98|  35778|   8321|2010|
| 99|  25673|   4612|2010|
|100|  51007|   9506|2010|
+---+-------+-------+----+

Als Nächstes erstellen wir eine neue aggregierte Spalte namens total, in der die Anzahl der Männer und Frauen addiert wird:

>>> from pyspark.sql.functions import lit
>>> seniors_final = seniors.withColumn('total',
  lit(seniors.males + seniors.females))
>>> seniors_final.show(200)
+---+-------+-------+----+-------+
|age|females|  males|year|  total|
+---+-------+-------+----+-------+
| 55|2167706|2059204|2010|4226910|
| 56|2106460|1989505|2010|4095965|
| 57|2048896|1924113|2010|3973009|
...
| 98|  35778|   8321|2010|  44099|
| 99|  25673|   4612|2010|  30285|
|100|  51007|   9506|2010|  60513|
+---+-------+-------+----+-------+

Laden

Der Ladevorgang beinhaltet das Speichern oder Schreiben der endgültigen Ausgabe des Transformationsschritts. Hier werden wir den seniors_final Datenrahmen in eine MySQL-Tabelle schreiben:

seniors_final\
  .write\
  .format("jdbc")\
  .option("driver", "com.mysql.jdbc.Driver")\
  .mode("overwrite")\
  .option("url", "jdbc:mysql://localhost/testdb")\
  .option("dbtable", "seniors")\
  .option("user", "root")\
  .option("password", "root_password")\
  .save()

Der letzte Schritt des Ladens ist die Überprüfung des Ladevorgangs:

$ mysql -uroot -p
Enter password: <password>
Your MySQL connection id is 9
Server version: 5.7.30 MySQL Community Server (GPL)

mysql> use testdb;
Database changed
mysql> select * from seniors;
+------+---------+---------+------+---------+
| age  | females | males   | year | total   |
+------+---------+---------+------+---------+
|   55 | 2167706 | 2059204 | 2010 | 4226910 |
|   56 | 2106460 | 1989505 | 2010 | 4095965 |
|   57 | 2048896 | 1924113 | 2010 | 3973009 |
...
|   98 |   35778 |    8321 | 2010 |   44099 |
|   99 |   25673 |    4612 | 2010 |   30285 |
|  100 |   51007 |    9506 | 2010 |   60513 |
+------+---------+---------+------+---------+
46 rows in set (0.00 sec)

Zusammenfassung

Fassen wir einige wichtige Punkte des Kapitels zusammen:

  • Spark ist eine schnelle und leistungsstarke Unified-Analytics-Engine (bis zu hundertmal schneller als herkömmliches Hadoop MapReduce) und bietet robuste, verteilte, fehlertolerante Datenabstraktionen (sogenannte RDDs und DataFrames). Spark lässt sich über die Pakete MLlib (Bibliothek für maschinelles Lernen) und GraphX (Graphenbibliothek) in die Welt des maschinellen Lernens und der Graphikanalyse integrieren.

  • Du kannst die Transformationen und Aktionen von Spark in vier Programmiersprachen nutzen: Java, Scala, R und Python. Mit PySpark (der Python-API für Spark) kannst du Big-Data-Probleme lösen und deine Daten effizient in das gewünschte Ergebnis und Format transformieren.

  • Big Data kann mit den Datenabstraktionen von Spark dargestellt werden (RDDs, Datenrahmen und Datensätze - alle diese sind verteilte Datensätze).

  • Du kannst PySpark über die PySpark-Shell (mit dem Befehl pyspark von einer Kommandozeile aus) für die interaktive Spark-Programmierung ausführen. Mit der PySpark-Shell kannst du RDDs und Datenrahmen erstellen und manipulieren.

  • Du kannst eine eigenständige PySpark-Anwendung mit dem Befehl spark-submit an einen Spark-Cluster übermitteln; eigenständige Anwendungen mit PySpark werden in Produktionsumgebungen eingesetzt.

  • Spark bietet viele Transformationen und Aktionen zur Lösung von Big-Data-Problemen, die sich in ihrer Leistung unterscheiden (z. B.reduceByKey() gegenüber groupByKey()und combineByKey() gegenüber groupByKey()).

Das nächste Kapitel befasst sich mit einigen wichtigen Spark-Transformationen.

Get Datenalgorithmen mit Spark now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.