Datenverarbeitung in Echtzeit mit Spark Structured Streaming und Delta Lake mit Azure Databricks
Spark Structured Streaming ermöglicht Ihnen die Verarbeitung von Daten in Echtzeit mit End-to-End-Fehlertoleranz. Delta Lake verbessert dies durch die Bereitstellung einer Speicherebene mit ACID-Transaktionen, die die Datenintegrität und Konsistenz sicherstellt. Sie können Daten aus dem Cloudspeicher in Delta Lake erfassen und Delta Live Tables verwenden, um Ihre Streamingdatenpipelinen zu verwalten und zu optimieren.
Dieses Lab dauert ungefähr 30 Minuten.
Hinweis: Die Benutzeroberfläche von Azure Databricks wird kontinuierlich verbessert. Die Benutzeroberfläche kann sich seit der Erstellung der Anweisungen in dieser Übung geändert haben.
Bereitstellen eines Azure Databricks-Arbeitsbereichs
Tipp: Wenn Sie bereits über einen Azure Databricks-Arbeitsbereich verfügen, können Sie dieses Verfahren überspringen und Ihren vorhandenen Arbeitsbereich verwenden.
Diese Übung enthält ein Skript zum Bereitstellen eines neuen Azure Databricks-Arbeitsbereichs. Das Skript versucht, eine Azure Databricks-Arbeitsbereichsressource im Premium-Tarif in einer Region zu erstellen, in der Ihr Azure-Abonnement über ein ausreichendes Kontingent für die in dieser Übung erforderlichen Computekerne verfügt. Es wird davon ausgegangen, dass Ihr Benutzerkonto über ausreichende Berechtigungen im Abonnement verfügt, um eine Azure Databricks-Arbeitsbereichsressource zu erstellen. Wenn das Skript aufgrund unzureichender Kontingente oder Berechtigungen fehlschlägt, können Sie versuchen, einen Azure Databricks-Arbeitsbereich interaktiv im Azure-Portal zu erstellen.
- Melden Sie sich in einem Webbrowser am Azure-Portal unter
https://portal.azure.com
an. -
Verwenden Sie die Taste [>_] rechts neben der Suchleiste oben auf der Seite, um eine neue Cloud Shell im Azure-Portal zu erstellen, und wählen Sie eine PowerShell-Umgebung aus. Die Cloud Shell bietet eine Befehlszeilenschnittstelle in einem Bereich am unteren Rand des Azure-Portals, wie hier gezeigt:
Hinweis: Wenn Sie zuvor eine Cloud-Shell erstellt haben, die eine Bash-Umgebung verwendet, wechseln Sie zu PowerShell.
-
Beachten Sie, dass Sie die Größe der Cloud-Shell ändern können, indem Sie die Trennlinie oben im Bereich ziehen oder die Symbole —, ⤢ und X oben rechts im Bereich verwenden, um den Bereich zu minimieren, zu maximieren und zu schließen. Weitere Informationen zur Verwendung von Azure Cloud Shell finden Sie in der Azure Cloud Shell-Dokumentation.
-
Geben Sie im PowerShell-Bereich die folgenden Befehle ein, um dieses Repository zu klonen:
rm -r mslearn-databricks -f git clone https://github.com/MicrosoftLearning/mslearn-databricks
-
Nachdem das Repository geklont wurde, geben Sie den folgenden Befehl ein, um das Skript setup.ps1 auszuführen, das einen Azure Databricks-Arbeitsbereich in einer verfügbaren Region bereitstellt:
./mslearn-databricks/setup.ps1
-
Wenn Sie dazu aufgefordert werden, wählen Sie aus, welches Abonnement Sie verwenden möchten (dies geschieht nur, wenn Sie Zugriff auf mehrere Azure-Abonnements haben).
- Warten Sie, bis das Skript abgeschlossen ist. Dies dauert in der Regel etwa 5 Minuten, in einigen Fällen kann es jedoch länger dauern. Während Sie warten, lesen Sie den Artikel Abfragen von Streamingdaten in der Azure Databricks-Dokumentation.
Erstellen eines Clusters
Azure Databricks ist eine verteilte Verarbeitungsplattform, die Apache Spark-Cluster verwendet, um Daten parallel auf mehreren Knoten zu verarbeiten. Jeder Cluster besteht aus einem Treiberknoten, um die Arbeit zu koordinieren, und Arbeitsknoten zum Ausführen von Verarbeitungsaufgaben. In dieser Übung erstellen Sie einen Einzelknotencluster , um die in der Lab-Umgebung verwendeten Computeressourcen zu minimieren (in denen Ressourcen möglicherweise eingeschränkt werden). In einer Produktionsumgebung erstellen Sie in der Regel einen Cluster mit mehreren Workerknoten.
Tipp: Wenn Sie bereits über einen Cluster mit einer Runtime 13.3 LTS oder einer höheren Runtimeversion in Ihrem Azure Databricks-Arbeitsbereich verfügen, können Sie ihn verwenden, um diese Übung abzuschließen und dieses Verfahren zu überspringen.
-
Navigieren Sie im Azure-Portal zur Ressourcengruppe msl-xxxxxxx, die vom Skript erstellt wurde (oder zur Ressourcengruppe, die Ihren vorhandenen Azure Databricks-Arbeitsbereich enthält).
-
Wählen Sie die Ressource Ihres Azure Databricks-Diensts aus (sie trägt den Namen databricks-xxxxxxx, wenn Sie das Setupskript zum Erstellen verwendet haben).
-
Verwenden Sie auf der Seite Übersicht für Ihren Arbeitsbereich die Schaltfläche Arbeitsbereich starten, um Ihren Azure Databricks-Arbeitsbereich auf einer neuen Browserregisterkarte zu öffnen. Melden Sie sich an, wenn Sie dazu aufgefordert werden.
Tipp: Während Sie das Databricks-Arbeitsbereichsportal verwenden, werden möglicherweise verschiedene Tipps und Benachrichtigungen angezeigt. Schließen Sie diese, und folgen Sie den Anweisungen, um die Aufgaben in dieser Übung auszuführen.
-
Wählen Sie in der linken Seitenleiste die Option (+) Neue Aufgabe und dann Cluster aus (ggf. im Untermenü Mehr suchen).
- Erstellen Sie auf der Seite Neuer Cluster einen neuen Cluster mit den folgenden Einstellungen:
- Clustername: Cluster des Benutzernamens (der Standardclustername)
- Richtlinie: Unrestricted
- Clustermodus: Einzelknoten
- Zugriffsmodus: Einzelner Benutzer (Ihr Benutzerkonto ist ausgewählt)
- Databricks-Runtimeversion: 13.3 LTS (Spark 3.4.1, Scala 2.12) oder höher
- Photonbeschleunigung verwenden: Ausgewählt
- Knotentyp: Standard_D4ds_v5
- Beenden nach 20 Minuten Inaktivität
-
Warten Sie, bis der Cluster erstellt wurde. Es kann ein oder zwei Minuten dauern.
Hinweis: Wenn Ihr Cluster nicht gestartet werden kann, verfügt Ihr Abonnement möglicherweise über ein unzureichendes Kontingent in der Region, in der Ihr Azure Databricks-Arbeitsbereich bereitgestellt wird. Details finden Sie unter Der Grenzwert für CPU-Kerne verhindert die Clustererstellung. In diesem Fall können Sie versuchen, Ihren Arbeitsbereich zu löschen und in einer anderen Region einen neuen zu erstellen. Sie können einen Bereich als Parameter für das Setupskript wie folgt angeben:
./mslearn-databricks/setup.ps1 eastus
Erstellen eines Notebook und Erfassen von Daten
Sie können Notizbücher in Ihrem Azure Databricks-Arbeitsbereich erstellen, um Code auszuführen, der in einer Reihe von Programmiersprachen geschrieben wurde. In dieser Übung erstellen Sie ein einfaches Notebook, das Daten aus einer Datei erfasst und in einem Ordner im Databricks-Dateisystem (Databricks File System, DBFS) speichert.
-
Zeigen Sie das Azure Databricks-Arbeitsbereichsportal an, und beachten Sie, dass die Randleiste auf der linken Seite Symbole für die verschiedenen Aufgaben enthält, die Sie ausführen können.
-
Verwenden Sie in der Randleiste den Link ** (+) Neu, um ein **Notebook zu erstellen.
-
Ändern Sie den Standardnamen des Notebooks (Unbenanntes Notebook [Datum]) in RealTimeIngestion.
-
Geben Sie in der ersten Zelle des Notebooks den folgenden Code ein, der mit Shellbefehlen die Datendateien von GitHub in das von Ihrem Cluster verwendete Dateisystem herunterlädt.
%sh rm -r /dbfs/device_stream mkdir /dbfs/device_stream wget -O /dbfs/device_stream/devices1.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices1.json
-
Verwenden Sie Menüoption ▸ Zelle Ausführen links neben der Zelle, um sie auszuführen. Warten Sie dann, bis der vom Code ausgeführte Spark-Auftrag, abgeschlossen ist.
Verwenden von Delta-Tabellen zum Streamen von Daten
Delta Lake unterstützt das Streamen von Daten. Deltatabellen können eine Senke oder Quelle für Datenströme sein, die mit der Spark Structured Streaming-API erstellt wurden. In diesem Beispiel verwenden Sie eine Deltatabelle als Senke für einige Streamingdaten in einem simulierten IoT-Szenario (Internet der Dinge). Die simulierten Gerätedaten liegen wie folgt im JSON-Format vor:
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
-
Verwenden Sie unter der Ausgabe für die erste Zelle das Symbol + Code, um eine neue Zelle hinzuzufügen. Führen Sie dann den folgenden Code darin aus, um einen Stream auf der Grundlage des Ordners mit den JSON-Gerätedaten zu erstellen:
from pyspark.sql.types import * from pyspark.sql.functions import * # Create a stream that reads data from the folder, using a JSON schema inputPath = '/device_stream/' jsonSchema = StructType([ StructField("device", StringType(), False), StructField("status", StringType(), False) ]) iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath) print("Source stream created...")
-
Fügen Sie eine neue Codezelle hinzu, und verwenden Sie diese, um den Datenstrom dauerhaft in einen Delta-Ordner zu schreiben:
# Write the stream to a delta table delta_stream_table_path = '/delta/iotdevicedata' checkpointpath = '/delta/checkpoint' deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path) print("Streaming to delta sink...")
-
Fügen Sie Code hinzu, um die Daten zu lesen, genau wie bei jedem anderen Delta-Ordner:
# Read the data in delta format into a dataframe df = spark.read.format("delta").load(delta_stream_table_path) display(df)
-
Fügen Sie den folgenden Code hinzu, um eine Tabelle basierend auf dem Delta-Ordner, in den die Streamingdaten geschrieben werden, zu erstellen:
# create a catalog table based on the streaming sink spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))
-
Verwenden Sie den folgenden Code, um die Tabelle abzufragen:
%sql SELECT * FROM IotDeviceData;
-
Führen Sie den folgenden Code aus, um dem Datenstrom einige neue Gerätedaten hinzuzufügen:
%sh wget -O /dbfs/device_stream/devices2.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices2.json
-
Führen Sie den folgenden SQL-Abfragecode erneut aus, um zu überprüfen, ob die neuen Daten dem Datenstrom hinzugefügt und in den Delta-Ordner geschrieben wurden:
%sql SELECT * FROM IotDeviceData;
-
Führen Sie folgenden Code aus, um den Datenstrom zu stoppen:
deltastream.stop()
Bereinigung
Wählen Sie zunächst im Azure Databricks-Portal auf der Seite Compute Ihren Cluster und dann ■ Beenden aus, um ihn herunterzufahren.
Wenn Sie die Erkundung von Azure Databricks abgeschlossen haben, löschen Sie die erstellten Ressourcen, um unnötige Azure-Kosten zu vermeiden und Kapazität in Ihrem Abonnement freizugeben.