Transformieren von Daten mit Apache Spark in Azure Databricks
Azure Databricks ist eine Microsoft Azure-basierte Version der beliebten Open-Source-Databricks-Plattform.
Azure Databricks basiert auf Apache Spark und bietet eine hoch skalierbare Lösung für Datentechnik- und Analyseaufgaben, die das Arbeiten mit Daten in Dateien beinhalten.
Allgemeine Datentransformationsaufgaben in Azure Databricks umfassen Datenbereinigung, Durchführen von Aggregationen und Typumwandlungen. Diese Transformationen sind unerlässlich für die Vorbereitung von Daten für die Analyse und sind Teil des größeren ETL-Prozesses (Extrahieren, Transformieren, Laden).
Diese Übung dauert ca. 30 Minuten.
Bereitstellen eines Azure Databricks-Arbeitsbereichs
Tipp: Wenn Sie bereits über einen Azure Databricks-Arbeitsbereich verfügen, können Sie dieses Verfahren überspringen und Ihren vorhandenen Arbeitsbereich verwenden.
Diese Übung enthält ein Skript zum Bereitstellen eines neuen Azure Databricks-Arbeitsbereichs. Das Skript versucht, eine Azure Databricks-Arbeitsbereichsressource im Premium-Tarif in einer Region zu erstellen, in der Ihr Azure-Abonnement über ein ausreichendes Kontingent für die in dieser Übung erforderlichen Computekerne verfügt. Es wird davon ausgegangen, dass Ihr Benutzerkonto über ausreichende Berechtigungen im Abonnement verfügt, um eine Azure Databricks-Arbeitsbereichsressource zu erstellen. Wenn das Skript aufgrund unzureichender Kontingente oder Berechtigungen fehlschlägt, können Sie versuchen, einen Azure Databricks-Arbeitsbereich interaktiv im Azure-Portal zu erstellen.
- Melden Sie sich in einem Webbrowser am Azure-Portal unter
https://portal.azure.com
an. -
Verwenden Sie rechts neben der Suchleiste oben auf der Seite die Schaltfläche [>_], um eine neue Cloud Shell-Instanz im Azure-Portal zu erstellen. Wählen Sie eine PowerShell-Umgebung aus, und erstellen Sie Speicher, falls Sie dazu aufgefordert werden. Die Cloud Shell bietet eine Befehlszeilenschnittstelle in einem Bereich am unteren Rand des Azure-Portals, wie hier gezeigt:
Hinweis: Wenn Sie zuvor eine Cloudshell erstellt haben, die eine Bash-Umgebung verwendet, verwenden Sie das Dropdownmenü links oben im Bereich „Cloudshell“, um sie in PowerShell zu ändern.
-
Beachten Sie, dass Sie die Größe der Cloud Shell durch Ziehen der Trennzeichenleiste oben im Bereich ändern können oder den Bereich mithilfe der Symbole —, ◻ und X oben rechts minimieren, maximieren und schließen können. Weitere Informationen zur Verwendung von Azure Cloud Shell finden Sie in der Azure Cloud Shell-Dokumentation.
-
Geben Sie im PowerShell-Bereich die folgenden Befehle ein, um dieses Repository zu klonen:
rm -r mslearn-databricks -f git clone https://github.com/MicrosoftLearning/mslearn-databricks
-
Nachdem das Repository geklont wurde, geben Sie den folgenden Befehl ein, um das Skript setup.ps1 auszuführen, das einen Azure Databricks-Arbeitsbereich in einer verfügbaren Region bereitstellt:
./mslearn-databricks/setup.ps1
- Wenn Sie dazu aufgefordert werden, wählen Sie aus, welches Abonnement Sie verwenden möchten (dies geschieht nur, wenn Sie Zugriff auf mehrere Azure-Abonnements haben).
- Warten Sie, bis das Skript abgeschlossen ist. Dies dauert in der Regel etwa 5 Minuten, in einigen Fällen kann es jedoch länger dauern. Während Sie warten, lesen Sie den Artikel Explorative Datenanalyse in Azure Databricks in der Dokumentation zu Azure Databricks.
Erstellen eines Clusters
Azure Databricks ist eine verteilte Verarbeitungsplattform, die Apache Spark-Cluster verwendet, um Daten parallel auf mehreren Knoten zu verarbeiten. Jeder Cluster besteht aus einem Treiberknoten, um die Arbeit zu koordinieren, und Arbeitsknoten zum Ausführen von Verarbeitungsaufgaben. In dieser Übung erstellen Sie einen Einzelknotencluster , um die in der Lab-Umgebung verwendeten Computeressourcen zu minimieren (in denen Ressourcen möglicherweise eingeschränkt werden). In einer Produktionsumgebung erstellen Sie in der Regel einen Cluster mit mehreren Workerknoten.
Tipp: Wenn Sie bereits über einen Cluster mit einer Runtime 13.3 LTS oder einer höheren Runtimeversion in Ihrem Azure Databricks-Arbeitsbereich verfügen, können Sie ihn verwenden, um diese Übung abzuschließen und dieses Verfahren zu überspringen.
- Navigieren Sie im Azure-Portal zur Ressourcengruppe msl-xxxxxxx, die vom Skript erstellt wurde (oder zur Ressourcengruppe, die Ihren vorhandenen Azure Databricks-Arbeitsbereich enthält).
- Wählen Sie die Ressource Ihres Azure Databricks-Diensts aus (sie trägt den Namen databricks-xxxxxxx, wenn Sie das Setupskript zum Erstellen verwendet haben).
-
Verwenden Sie auf der Seite Übersicht für Ihren Arbeitsbereich die Schaltfläche Arbeitsbereich starten, um Ihren Azure Databricks-Arbeitsbereich auf einer neuen Browserregisterkarte zu öffnen. Melden Sie sich an, wenn Sie dazu aufgefordert werden.
Tipp: Während Sie das Databricks-Arbeitsbereichsportal verwenden, werden möglicherweise verschiedene Tipps und Benachrichtigungen angezeigt. Schließen Sie diese, und folgen Sie den Anweisungen, um die Aufgaben in dieser Übung auszuführen.
- Wählen Sie zunächst in der Randleiste auf der linken Seite die Aufgabe (+) Neu und dann Cluster aus.
- Erstellen Sie auf der Seite Neuer Cluster einen neuen Cluster mit den folgenden Einstellungen:
- Clustername: Cluster des Benutzernamens (der Standardclustername)
- Richtlinie: Unrestricted
- Clustermodus: Einzelknoten
- Zugriffsmodus: Einzelner Benutzer (Ihr Benutzerkonto ist ausgewählt)
- Databricks-Runtimeversion: 13.3 LTS (Spark 3.4.1, Scala 2.12) oder höher
- Photonbeschleunigung verwenden: Ausgewählt
- Knotentyp: Standard_D4ds_v5
- Beenden nach 20 Minuten Inaktivität
- Warten Sie, bis der Cluster erstellt wurde. Es kann ein oder zwei Minuten dauern.
Hinweis: Wenn Ihr Cluster nicht gestartet werden kann, verfügt Ihr Abonnement möglicherweise über ein unzureichendes Kontingent in der Region, in der Ihr Azure Databricks-Arbeitsbereich bereitgestellt wird. Details finden Sie unter Der Grenzwert für CPU-Kerne verhindert die Clustererstellung. In diesem Fall können Sie versuchen, Ihren Arbeitsbereich zu löschen und in einer anderen Region einen neuen zu erstellen. Sie können einen Bereich als Parameter für das Setupskript wie folgt angeben:
./mslearn-databricks/setup.ps1 eastus
Erstellen eines Notebooks
-
Verwenden Sie in der Randleiste den Link ** (+) Neu, um ein **Notebook zu erstellen.
-
Ändern Sie den Standardnamen des Notebooks (Unbenanntes Notebook [Datum]) in
Transform data with Spark
und wählen Sie in der Dropdown-Liste Verbinden Ihren Cluster aus, falls er nicht bereits ausgewählt ist. Wenn der Cluster nicht ausgeführt wird, kann es eine Minute dauern, bis er gestartet wird.
Einlesen von Daten
-
Geben Sie in der ersten Zelle des Notebooks den folgenden Code ein, der mit Shell-Befehlen die Datendateien von GitHub in das von Ihrem Cluster verwendete Dateisystem herunterlädt.
%sh rm -r /dbfs/spark_lab mkdir /dbfs/spark_lab wget -O /dbfs/spark_lab/2019.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/2019_edited.csv wget -O /dbfs/spark_lab/2020.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/2020_edited.csv wget -O /dbfs/spark_lab/2021.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/2021_edited.csv
- Verwenden Sie Menüoption ▸ Zelle Ausführen links neben der Zelle, um sie auszuführen. Warten Sie dann, bis der vom Code ausgeführte Spark-Auftrag, abgeschlossen ist.
-
Fügen Sie eine neue Codezelle hinzu, und verwenden Sie diese, um den folgenden Code auszuführen, der ein Schema für die Daten definiert:
from pyspark.sql.types import * from pyspark.sql.functions import * orderSchema = StructType([ StructField("SalesOrderNumber", StringType()), StructField("SalesOrderLineNumber", IntegerType()), StructField("OrderDate", DateType()), StructField("CustomerName", StringType()), StructField("Email", StringType()), StructField("Item", StringType()), StructField("Quantity", IntegerType()), StructField("UnitPrice", FloatType()), StructField("Tax", FloatType()) ]) df = spark.read.load('/spark_lab/*.csv', format='csv', schema=orderSchema) display(df.limit(100))
Bereinigen der Daten
Beachten Sie, dass dieses Dataset einige duplizierte Zeilen und null
-Werte in der Spalte Steuern enthält. Daher ist ein Bereinigungsschritt erforderlich, bevor eine weitere Verarbeitung und Analyse mit den Daten durchgeführt wird.
-
Verwenden Sie unter der vorhandenen Codezelle das Symbol +, um eine neue Codezelle hinzuzufügen. Geben Sie dann in der neuen Zelle den folgenden Code ein und führen Sie ihn aus, um doppelte Zeilen aus der Tabelle zu entfernen und die
null
-Einträge durch die richtigen Werte zu ersetzen:from pyspark.sql.functions import col df = df.dropDuplicates() df = df.withColumn('Tax', col('UnitPrice') * 0.08) df = df.withColumn('Tax', col('Tax').cast("float"))
Beachten Sie, dass nach dem Aktualisieren der Werte in der Spalte Steuern der Datentyp erneut auf float
festgelegt ist. Dies liegt daran, dass sich der Datentyp nach der Berechnung zu double
ändert. Da double
eine höhere Speicherauslastung hat als float
, ist es besser für die Leistung, wenn die Spalte zurück zum Typ float
umgewandelt wird.
Filtern eines Dataframes
- Fügen Sie eine neue Codezelle hinzu, und verwenden Sie diese, um den folgenden Code auszuführen, der Folgendes ausführt:
- Filtern der Spalten des DataFrame für Verkaufsaufträge, um nur den Kundennamen und die E-Mail-Adresse einzuschließen.
- Zählen der Gesamtzahl der Bestelldatensätze
- Zählen der Anzahl unterschiedlicher Kunden
- Anzeigen der unterschiedlichen Kunden
customers = df['CustomerName', 'Email'] print(customers.count()) print(customers.distinct().count()) display(customers.distinct())
Beachten Sie die folgenden Details:
- Wenn Sie einen Vorgang für einen Dataframe ausführen, ist das Ergebnis ein neuer Dataframe (in diesem Fall wird ein neuer customers-Dataframe erstellt, indem eine bestimmte Teilmenge von Spalten aus dem df-Dataframe ausgewählt wird).
- Dataframes bieten Funktionen wie count und distinct, die zum Zusammenfassen und Filtern der darin enthaltenen Daten verwendet werden können.
- Die Syntax
dataframe['Field1', 'Field2', ...]
ist eine Kurzform für die Definition einer Teilmenge von Spalten. Sie können auch die select-Methode verwenden, sodass die erste Zeile des obigen Codes wie folgt geschrieben werden kann:customers = df.select("CustomerName", "Email")
.
-
Jetzt wenden wir einen Filter an, um nur die Kunden einzuschließen, die eine Bestellung für ein bestimmtes Produkt aufgegeben haben, indem wir den folgenden Code in einer neuen Codezelle ausführen:
customers = df.select("CustomerName", "Email").where(df['Item']=='Road-250 Red, 52') print(customers.count()) print(customers.distinct().count()) display(customers.distinct())
Beachten Sie, dass Sie mehrere Funktionen miteinander „verketten“ können, sodass die Ausgabe der einen Funktion zur Eingabe für die nächste wird. In diesem Fall ist der von der select-Methode erstellte DataFrame der Quell-DataFrame für die where-Methode, die zum Anwenden von Filterkriterien verwendet wird.
Aggregieren und Gruppieren von Daten in einem Dataframe
-
Führen Sie den folgenden Code in einer neuen Codezelle aus, um die Bestelldaten zu aggregieren und zu gruppieren:
productSales = df.select("Item", "Quantity").groupBy("Item").sum() display(productSales)
Beachten Sie, dass in den Ergebnissen die Summe der Bestellmengen nach Produkt gruppiert angezeigt wird. Die groupBy-Methode gruppiert die Zeilen nach Item, und die nachfolgende sum-Aggregatfunktion wird auf alle verbleibenden numerischen Spalten angewendet (in diesem Fall Quantity).
-
Probieren wir in einer neuen Codezelle eine weitere Aggregation aus:
yearlySales = df.select(year("OrderDate").alias("Year")).groupBy("Year").count().orderBy("Year") display(yearlySales)
Dieses Mal zeigen die Ergebnisse die Anzahl der Verkaufsaufträge pro Jahr. Beachten Sie, dass die select-Methode eine SQL-Funktion year enthält, um die Jahreskomponente des Felds OrderDate zu extrahieren, und anschließend wird eine alias-Methode verwendet, um dem extrahierten Jahreswert einen Spaltennamen zuzuweisen. Die Daten werden dann nach der abgeleiteten Year-Spalte gruppiert. Anschließend wird mit count die Anzahl der Zeilen in jeder Gruppe berechnet, bevor schließlich die orderBy-Methode verwendet wird, um den resultierenden DataFrame zu sortieren.
Hinweis: Weitere Informationen zum Arbeiten mit DataFrames in Azure Databricks finden Sie in der Azure Databricks-Dokumentation unter Einführung in DataFrames – Python.
Ausführen von SQL-Code in einer Zelle
-
Obwohl es nützlich ist, SQL-Anweisungen in eine Zelle einzubetten, die PySpark-Code enthält, arbeiten Datenanalysten oft lieber direkt mit SQL. Fügen Sie eine neue Codezelle hinzu, und verwenden Sie diese, um den folgenden Code auszuführen.
df.createOrReplaceTempView("salesorders")
Diese Codezeile erstellt eine temporäre Ansicht, die dann direkt mit SQL-Anweisungen verwendet werden kann.
-
Führen Sie in einer neuen Zelle den folgenden Code aus:
%sql SELECT YEAR(OrderDate) AS OrderYear, SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue FROM salesorders GROUP BY YEAR(OrderDate) ORDER BY OrderYear;
Beachten Sie, Folgendes:
- Die %sql-Zeile am Anfang der Zelle (als „Magic“ bezeichnet) gibt an, dass die Laufzeitumgebung der SQL-Sprache Spark anstelle von PySpark verwendet werden soll, um den Code in dieser Zelle auszuführen.
- Der SQL-Code verweist auf die Ansicht salesorders, die Sie zuvor erstellt haben.
- Die Ausgabe der SQL-Abfrage wird automatisch als Ergebnis unter der Zelle angezeigt.
Hinweis: Weitere Informationen zu Spark SQL und Dataframes finden Sie in der Spark SQL-Dokumentation.
Bereinigung
Wählen Sie zunächst im Azure Databricks-Portal auf der Seite Compute Ihren Cluster und dann ■ Beenden aus, um ihn herunterzufahren.
Wenn Sie die Erkundung von Azure Databricks abgeschlossen haben, löschen Sie die erstellten Ressourcen, um unnötige Azure-Kosten zu vermeiden und Kapazität in Ihrem Abonnement freizugeben.