Kapitel 4. Strukturierte API Übersicht

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

Dieser Teil des Buches wird ein tiefer Einblick in die Structured APIs von Spark sein. Die strukturierten APIs sind ein Werkzeug zur Bearbeitung aller Arten von Daten, von unstrukturierten Logdateien über halbstrukturierte CSV-Dateien bis hin zu hochstrukturierten Parquet-Dateien. Diese APIs beziehen sich auf drei Kerntypen von verteilten Sammel-APIs:

  • Datensätze

  • Datenrahmen

  • SQL-Tabellen und Ansichten

Obwohl es sich um unterschiedliche Teile des Buches handelt, gilt der Großteil der strukturierten APIs sowohl für Batch- als auch für Streaming-Berechnungen. Das bedeutet, dass du bei der Arbeit mit den strukturierten APIs ohne großen Aufwand von Batch auf Streaming (oder umgekehrt) umsteigen kannst. Wir werden das Streaming in Teil V ausführlich behandeln.

Die strukturierten APIs sind die grundlegende Abstraktion, mit der du die meisten deiner Datenströme schreiben wirst. Bislang haben wir in diesem Buch einen tutoriellen Ansatz verfolgt und uns durch vieles, was Spark zu bieten hat, durchgeschlagen. Dieser Teil bietet eine tiefergehende Erkundung. In diesem Kapitel stellen wir dir die grundlegenden Konzepte vor, die du verstehen solltest: die typisierten und untypisierten APIs (und ihre Unterschiede); die Kernterminologie; und schließlich, wie Spark deine strukturierten API-Datenströme aufnimmt und im Cluster ausführt. Anschließend werden wir dir spezielle aufgabenbezogene Informationen für die Arbeit mit bestimmten Datentypen oder Datenquellen geben.

Hinweis

Bevor wir auf fortfahren, wollen wir die grundlegenden Konzepte und Definitionen aus Teil I wiederholen. Spark ist ein verteiltes Programmiermodell, bei dem der Benutzer Transformationen festlegt. Mehrere Transformationen bilden einen gerichteten azyklischen Graphen von Anweisungen. Eine Aktion beginnt mit der Ausführung dieses Graphen von Anweisungen als einzelner Auftrag, indem er in Phasen und Aufgaben aufgeteilt wird, die im gesamten Cluster ausgeführt werden. Die logischen Strukturen, die wir mit Transformationen und Aktionen manipulieren, sind Datenrahmen und Datensätze. Um einen neuen Datenrahmen oder ein neues Dataset zu erstellen, rufst du eine Transformation auf. Um eine Berechnung zu starten oder in einen muttersprachlichen Typ umzuwandeln, rufst du eine Aktion auf.

Datenrahmen und Datensätze

Teil I behandelte Datenrahmen. Spark hat zwei Begriffe für strukturierte Sammlungen: Datenrahmen und Datensätze. Wir werden in Kürze auf die (feinen) Unterschiede eingehen, aber zuerst wollen wir definieren, was beide Begriffe bedeuten.

Datenrahmen und Datensätze sind (verteilte) tabellenartige Sammlungen mit genau definierten Zeilen und Spalten. Jede Spalte muss die gleiche Anzahl von Zeilen haben wie alle anderen Spalten (obwohl du mit null das Fehlen eines Wertes angeben kannst) und jede Spalte hat Typinformationen, die für jede Zeile in der Sammlung konsistent sein müssen. Für Spark stellen Datenrahmen und Datensätze unveränderliche, langsam ausgewertete Pläne dar, die angeben, welche Operationen auf die Daten an einem bestimmten Ort angewendet werden sollen, um eine bestimmte Ausgabe zu erzeugen. Wenn wir eine Aktion an einem Datenrahmen durchführen, weisen wir Spark an, die eigentlichen Umwandlungen vorzunehmen und das Ergebnis zurückzugeben. Dies sind Pläne, die festlegen, wie Zeilen und Spalten manipuliert werden sollen, um das vom Nutzer gewünschte Ergebnis zu berechnen.

Hinweis

Tabellen und Ansichten sind im Grunde das Gleiche wie Datenrahmen. Wir führen nur SQL anstelle von DataFrame-Code auf ihnen aus. All das wird in Kapitel 10 behandelt, das sich speziell mit Spark SQL beschäftigt.

Um diese Definitionen ein wenig zu präzisieren, müssen wir über Schemata sprechen, mit denen du die Datentypen definierst, die du in dieser verteilten Sammlung speicherst.

Schemata

Ein Schema definiert die Spaltennamen und -typen eines Datenrahmens (DataFrame). Du kannst Schemas manuell definieren oder ein Schema aus einer Datenquelle lesen (oft Schema on Read genannt). Schemas bestehen aus Typen, was bedeutet, dass du eine Möglichkeit brauchst, um festzulegen, was wo liegt.

Übersicht über die strukturierten Spark-Typen

Spark ist praktisch eine eigene Programmiersprache. Intern verwendet Spark eine Engine namens Catalyst, die bei der Planung und Verarbeitung der Arbeit ihre eigenen Typinformationen verwaltet. Dadurch wird eine Vielzahl von Ausführungsoptimierungen möglich, die erhebliche Unterschiede machen. Spark-Typen werden direkt auf die verschiedenen Sprach-APIs abgebildet, die Spark verwaltet, und es gibt für jede dieser Sprachen eine Nachschlagetabelle in Scala, Java, Python, SQL und R. Selbst wenn wir die strukturierten APIs von Spark von Python oder R aus verwenden, werden die meisten unserer Manipulationen ausschließlich mit Spark-Typen und nicht mit Python-Typen durchgeführt. Der folgende Code führt zum Beispiel keine Addition in Scala oder Python durch, sondern ausschließlich in Spark:

// in Scala
val df = spark.range(500).toDF("number")
df.select(df.col("number") + 10)
# in Python
df = spark.range(500).toDF("number")
df.select(df["number"] + 10)

Diese Hinzufügung erfolgt, weil Spark einen in einer Eingabesprache geschriebenen Ausdruck in die interne Catalyst-Darstellung desselben Typs umwandelt. Dann wird auf diese interne Darstellung zugegriffen. Wir werden gleich darauf eingehen, warum dies der Fall ist, aber vorher müssen wir über Datensätze sprechen.

Datenrahmen vs. Datensätze

Unter gibt es innerhalb der strukturierten APIs zwei weitere APIs, die "untypisierten" DataFrames und die "typisierten" Datasets. Zu sagen, dass Datenrahmen untypisiert sind, ist etwas ungenau; sie haben Typen, aber Spark verwaltet sie vollständig und prüft nur zur Laufzeit, ob diese Typen mit den im Schema angegebenen übereinstimmen. Datasets hingegen prüfen zur Kompilierungszeit, ob die Typen mit der Spezifikation übereinstimmen. Datasets sind nur für Sprachen verfügbar, die auf der Java Virtual Machine (JVM) basieren (Scala und Java), und wir spezifizieren Typen mit Fallklassen oder Java Beans.

Unter wirst du wahrscheinlich mit Datenrahmen arbeiten. Für Spark (in Scala) sind DataFrames einfach Datensätze vom Typ Row. Der Typ "Row" ist Sparks interne Darstellung des optimierten In-Memory-Formats für Berechnungen. Dieses Format ermöglicht hochspezialisierte und effiziente Berechnungen, denn anstatt JVM-Typen zu verwenden, die hohe Kosten für die Speicherbereinigung und die Instanziierung von Objekten verursachen können, kann Spark mit seinem eigenen internen Format arbeiten, ohne dass diese Kosten anfallen. Für Spark (in Python oder R) gibt es kein Dataset: Alles ist ein Datenrahmen und deshalb arbeiten wir immer mit diesem optimierten Format.

Hinweis

Das interne Catalyst-Format ist in zahlreichen Spark-Präsentationen gut beschrieben. Da dieses Buch für ein allgemeines Publikum gedacht ist, gehen wir nicht näher auf die Implementierung ein. Wenn du neugierig bist, gibt es einige hervorragende Vorträge von Josh Rosen und Herman van Hovell, beide von Databricks, über ihre Arbeit bei der Entwicklung der Catalyst-Engine von Spark.

Um DataFrames, Spark-Typen und Schemas zu verstehen, brauchst du etwas Zeit. Was du wissen musst, ist, dass du bei der Verwendung von Datenrahmen die Vorteile des optimierten internen Formats von Spark nutzt. Dieses Format wendet die gleichen Effizienzgewinne auf alle Sprach-APIs von Spark an. Wenn du eine strenge Kompilierzeitüberprüfung benötigst, lies Kapitel 11, um mehr darüber zu erfahren.

Kommen wir zu freundlicheren und leichter zugänglichen Konzepten: Spalten und Zeilen.

Rubriken

Spalten repräsentieren einen einfachen Typ wie eine Ganzzahl oder einen String, einen komplexen Typ wie ein Array oder eine Map oder einen Nullwert. Spark verfolgt all diese Typinformationen für dich und bietet eine Vielzahl von Möglichkeiten, mit denen du Spalten umwandeln kannst. Spalten werden in Kapitel 5 ausführlich besprochen, aber im Großen und Ganzen kannst du dir die Typen von Spark Column wie Spalten in einer Tabelle vorstellen.

Reihen

Eine Zeile ist nichts anderes als ein Datensatz. Jeder Datensatz in einem Datenrahmen muss vom Typ Row sein, wie wir sehen können, wenn wir die folgenden Datenrahmen sammeln. Wir können diese Zeilen manuell aus SQL, aus Resilient Distributed Datasets (RDDs), aus Datenquellen oder manuell von Grund auf erstellen. Hier erstellen wir eine Zeile mit Hilfe eines Bereichs:

// in Scala
spark.range(2).toDF().collect()
# in Python
spark.range(2).collect()

Beide führen zu einem Array von Row Objekten.

Funkenarten

Wir haben bereits erwähnt, dass Spark eine große Anzahl interner Typendarstellungen hat. Auf den nächsten Seiten findest du eine praktische Referenztabelle, damit du ganz einfach nachschlagen kannst, welcher Typ in deiner Sprache dem Typ in Spark entspricht.

Bevor zu diesen Tabellen kommt, lass uns darüber sprechen, wie wir eine Spalte instanziieren oder deklarieren, damit sie einen bestimmten Typ hat.

Um mit den richtigen Scala-Typen zu arbeiten, verwende Folgendes:

import org.apache.spark.sql.types._
val b = ByteType

Um mit den richtigen Java-Typen zu arbeiten, solltest du die Factory-Methoden aus dem folgenden Paket verwenden:

import org.apache.spark.sql.types.DataTypes;
ByteType x = DataTypes.ByteType;

Python-Typen haben manchmal bestimmte Anforderungen, die du in Tabelle 4-1 siehst, ebenso wie Scala und Java, die du in Tabelle 4-2 bzw. 4-3 siehst. Um mit den richtigen Python-Typen zu arbeiten, verwende Folgendes:

from pyspark.sql.types import *
b = ByteType()

In den folgenden Tabellen findest du die detaillierten Typinformationen für jede Sprachbindung von Spark.

Tabelle 4-1. Python-Typ-Referenz
Datentyp Wertetyp in Python API für den Zugriff auf oder die Erstellung eines Datentyps

ByteType

int oder long. Hinweis: Die Zahlen werden zur Laufzeit in 1-Byte-Zahlen mit Vorzeichen umgewandelt. Achte darauf, dass die Zahlen innerhalb des Bereichs von -128 bis 127 liegen.

ByteType()

ShortType

int oder long. Hinweis: Zahlen werden zur Laufzeit in 2-Byte-Ganzzahlen mit Vorzeichen umgewandelt. Achte darauf, dass die Zahlen im Bereich von -32768 bis 32767 liegen.

ShortType()

IntegerType

int oder long. Hinweis: Python hat eine nachsichtige Definition von "Integer". Zu große Zahlen werden von Spark SQL zurückgewiesen, wenn du den IntegerType() verwendest. Es ist die bewährte Methode, LongType zu verwenden.

IntegerType()

LongType

lang. Hinweis: Die Zahlen werden zur Laufzeit in 8-Byte-Zahlen mit Vorzeichen umgewandelt. Achte darauf, dass die Zahlen im Bereich von -9223372036854775808 bis 9223372036854775807 liegen. Andernfalls konvertierst du die Daten in decimal.Decimal und verwendest DecimalType.

LongType()

FloatType

float. Hinweis: Die Zahlen werden zur Laufzeit in 4-Byte-Gleitkommazahlen mit einfacher Genauigkeit umgewandelt.

FloatType()

DoubleType

Schwimmer

DoubleType()

DecimalType

dezimal.dezimal

DecimalType()

StringType

String

StringType()

BinaryType

Bytearray

BinaryType()

BoolescherTyp

bool

BooleanType()

ZeitstempelTyp

datetime.datetime

ZeitstempelTyp()

DateType

datetime.date

DateType()

ArrayType

Liste, Tupel oder Array

ArrayType(elementType, [containsNull]). Hinweis: Der Standardwert für containsNull ist True.

MapType

Diktat

MapType(keyType, valueType, [valueContainsNull]). Hinweis: Der Standardwert von valueContainsNull ist True.

StructType

Liste oder Tupel

StructType(Felder). Hinweis: fields ist eine Liste von StructFields. Außerdem sind Felder mit demselben Namen nicht erlaubt.

StructField

Der Wertetyp in Python des Datentyps dieses Feldes (zum Beispiel Int für ein StructField mit dem Datentyp IntegerType)

StructField(name, dataType, [nullable]) Hinweis: Der Standardwert von nullable ist True.

Tabelle 4-2. Scala-Typ-Referenz
Datentyp Wertetyp in Scala API für den Zugriff auf oder die Erstellung eines Datentyps

ByteType

Byte

ByteType

ShortType

Kurz

ShortType

IntegerType

Int

IntegerType

LongType

Lang

LongType

FloatType

Schwimmer

FloatType

DoubleType

Doppelter

DoubleType

DecimalType

java.math.BigDecimal

DecimalType

StringType

String

StringType

BinaryType

Array[Byte]

BinaryType

BoolescherTyp

Boolesche

BoolescherTyp

ZeitstempelTyp

java.sql.Timestamp

ZeitstempelTyp

DateType

java.sql.Date

DateType

ArrayType

scala.collection.Seq

ArrayType(elementType, [containsNull]). Hinweis: Der Standardwert für containsNull ist true.

MapType

scala.collection.Map

MapType(keyType, valueType, [valueContainsNull]). Hinweis: Der Standardwert von valueContainsNull ist true.

StructType

org.apache.spark.sql.Row

StructType(fields). Hinweis: fields ist ein Array von StructFields. Außerdem sind Felder mit demselben Namen nicht erlaubt.

StructField

Der Wertetyp in Scala des Datentyps dieses Feldes (zum Beispiel Int für ein StructField mit dem Datentyp IntegerType)

StructField(name, dataType, [nullable]). Hinweis: Der Standardwert von nullable ist true.

Tabelle 4-3. Java-Typ-Referenz
Datentyp Wertetyp in Java API für den Zugriff auf oder die Erstellung eines Datentyps

ByteType

Byte oder Byte

DataTypes.ByteType

ShortType

kurz oder kurz

DataTypes.ShortType

IntegerType

int oder Integer

DataTypes.IntegerType

LongType

lang oder Lang

DataTypes.LongType

FloatType

float oder Float

DataTypes.FloatType

DoubleType

double oder Double

DataTypes.DoubleType

DecimalType

java.math.BigDecimal

DataTypes.createDecimalType() DataTypes.createDecimalType(precision, scale).

StringType

String

DataTypes.StringType

BinaryType

byte[]

DataTypes.BinaryType

BoolescherTyp

boolesch oder boolesch

DataTypes.BooleanType

ZeitstempelTyp

java.sql.Timestamp

DataTypes.TimestampType

DateType

java.sql.Date

DataTypes.DateType

ArrayType

java.util.Liste

DataTypes.createArrayType(elementType). Hinweis: Der Wert von containsNull ist true DataTypes.createArrayType(elementType, containsNull).

MapType

java.util.Map

DataTypes.createMapType(keyType, valueType). Hinweis: Der Wert von valueContainsNull wird true sein. DataTypes.createMapType(keyType, valueType, valueContainsNull)

StructType

org.apache.spark.sql.Row

DataTypes.createStructType(fields). Hinweis: fields ist eine Liste oder ein Array von StructFields. Außerdem sind zwei Felder mit demselben Namen nicht erlaubt.

StructField

Der Wertetyp in Java des Datentyps dieses Feldes (zum Beispiel int für ein StructField mit dem Datentyp IntegerType)

DataTypes.createStructField(name, dataType, nullable)

Es lohnt sich, daran zu denken, dass sich die Typen im Laufe der Zeit ändern können, da Spark SQL immer weiter wächst, daher solltest du dich in der Spark-Dokumentation über zukünftige Updates informieren. Natürlich sind alle diese Typen großartig, aber du arbeitest fast nie mit rein statischen Datenrahmen. Du wirst sie immer manipulieren und umwandeln. Deshalb ist es wichtig, dass wir dir einen Überblick über den Ausführungsprozess in den strukturierten APIs geben.

Überblick über die strukturierte API-Ausführung

In diesem Abschnitt von wird gezeigt, wie dieser Code tatsächlich in einem Cluster ausgeführt wird. Das wird dir helfen, den Prozess des Schreibens und Ausführens von Code auf Clustern zu verstehen (und möglicherweise zu debuggen). Gehen wir also die Ausführung einer einzelnen strukturierten API-Abfrage vom Benutzercode bis zum ausgeführten Code durch. Hier ist ein Überblick über die Schritte:

  1. Datenrahmen/Datensatz/SQL-Code schreiben.

  2. Wenn der Code gültig ist, wandelt Spark ihn in einen logischen Plan um.

  3. Spark wandelt diesen logischen Plan in einen physischen Plan um und prüft dabei auf Optimierungen.

  4. Spark führt dann diesen physischen Plan (RDD-Manipulationen) auf dem Cluster aus.

Um Code auszuführen, müssen wir Code schreiben. Dieser Code wird dann entweder über die Konsole oder über einen eingereichten Auftrag an Spark übermittelt. Der Code durchläuft dann den Catalyst Optimizer, der entscheidet, wie der Code ausgeführt werden soll, und einen Plan dafür aufstellt, bevor der Code ausgeführt und das Ergebnis an den Benutzer zurückgegeben wird. Abbildung 4-1 zeigt den Prozess.

image
Abbildung 4-1. Der Katalysator-Optimierer

Logische Planung

Die erste Ausführungsphase von dient dazu, den Benutzercode in einen logischen Plan umzuwandeln. Abbildung 4-2 veranschaulicht diesen Prozess.

image
Abbildung 4-2. Der strukturierte logische API-Planungsprozess

Der logische Plan stellt nur eine Reihe von abstrakten Transformationen dar, die sich nicht auf Executors oder Treiber beziehen, sondern lediglich dazu dienen, die Ausdrücke des Benutzers in die optimalste Version umzuwandeln. Dazu wird der Code des Benutzers in einen nicht aufgelösten logischen Plan umgewandelt. Dieser Plan ist unaufgelöst, weil dein Code zwar gültig ist, die Tabellen oder Spalten, auf die er sich bezieht, aber möglicherweise nicht existieren. Spark verwendet den Katalog, einen Speicher für alle Tabellen- und Datenrahmeninformationen, um Spalten und Tabellen im Analyzer aufzulösen. Der Analyzer kann den nicht aufgelösten logischen Plan zurückweisen, wenn der erforderliche Tabellen- oder Spaltenname nicht im Katalog existiert. Wenn der Analyzer ihn auflösen kann, wird das Ergebnis durch den Catalyst Optimizer geleitet, eine Sammlung von Regeln, die versuchen, den logischen Plan zu optimieren, indem sie Prädikate oder Selektionen nach unten schieben. Pakete können den Catalyst erweitern, um eigene Regeln für domänenspezifische Optimierungen hinzuzufügen.

Physikalische Planung

Nachdem erfolgreich einen optimierten logischen Plan erstellt hat, beginnt Spark mit dem physischen Planungsprozess. Der physische Plan, der oft als Spark-Plan bezeichnet wird, legt fest, wie der logische Plan auf dem Cluster ausgeführt wird, indem er verschiedene physische Ausführungsstrategien generiert und sie mithilfe eines Kostenmodells vergleicht (siehe Abbildung 4-3). Ein Beispiel für einen Kostenvergleich ist die Entscheidung, wie eine bestimmte Verknüpfung durchgeführt werden soll, indem die physischen Attribute einer bestimmten Tabelle (wie groß die Tabelle oder ihre Partitionen sind) betrachtet werden.

image
Abbildung 4-3. Der physische Planungsprozess

Die physische Planung resultiert in einer Reihe von RDDs und Transformationen. Vielleicht hast du schon einmal gehört, dass Spark als Compiler bezeichnet wird - es nimmt Abfragen in Datenrahmen, Datensätzen und SQL und kompiliert sie in RDD-Transformationen für dich.

Ausführung

Nach der Auswahl eines physischen Plans führt Spark den gesamten Code über RDDs aus, die untergeordnete Programmierschnittstelle von Spark (die wir in Teil III behandeln). Spark führt zur Laufzeit weitere Optimierungen durch und erzeugt nativen Java-Bytecode, der während der Ausführung ganze Aufgaben oder Phasen entfernen kann. Schließlich wird das Ergebnis an den Nutzer zurückgegeben.

Fazit

In diesem Kapitel haben wir uns mit den strukturierten APIs von Spark befasst und damit, wie Spark deinen Code so umwandelt, dass er physisch auf dem Cluster ausgeführt werden kann. In den folgenden Kapiteln geht es um die Kernkonzepte und die Nutzung der wichtigsten Funktionen der strukturierten APIs.

Get Spark: Der endgültige Leitfaden now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.