Ingestion et traitement en temps réel avec Spark Structured Streaming et Delta Lake avec Azure Databricks

Spark Structured Streaming vous permet de traiter des données en temps réel avec une tolérance de panne de bout en bout. Delta Lake améliore cela en fournissant une couche de stockage avec des transactions ACID, garantissant ainsi l’intégrité et la cohérence des données. Vous pouvez ingérer des données à partir du stockage cloud dans Delta Lake et utiliser Delta Live Tables pour gérer et optimiser vos pipelines de données de diffusion.

Ce labo prend environ 30 minutes.

Provisionner un espace de travail Azure Databricks

Conseil : Si vous disposez déjà d’un espace de travail Azure Databricks, vous pouvez ignorer cette procédure et utiliser votre espace de travail existant.

Cet exercice inclut un script permettant d’approvisionner un nouvel espace de travail Azure Databricks. Le script tente de créer une ressource d’espace de travail Azure Databricks de niveau Premium dans une région dans laquelle votre abonnement Azure dispose d’un quota suffisant pour les cœurs de calcul requis dans cet exercice ; et suppose que votre compte d’utilisateur dispose des autorisations suffisantes dans l’abonnement pour créer une ressource d’espace de travail Azure Databricks. Si le script échoue en raison d’un quota insuffisant ou d’autorisations insuffisantes, vous pouvez essayer de créer un espace de travail Azure Databricks de manière interactive dans le portail Azure.

  1. Dans un navigateur web, connectez-vous au portail Azure à l’adresse https://portal.azure.com.

  2. Utilisez le bouton [>_] à droite de la barre de recherche, en haut de la page, pour créer un environnement Cloud Shell dans le portail Azure, en sélectionnant un environnement PowerShell et en créant le stockage si vous y êtes invité. Cloud Shell fournit une interface de ligne de commande dans un volet situé en bas du portail Azure, comme illustré ici :

    Portail Azure avec un volet Cloud Shell

    Remarque : si vous avez créé un shell cloud qui utilise un environnement Bash, utilisez le menu déroulant en haut à gauche du volet Cloud Shell pour le remplacer par PowerShell.

  3. Notez que vous pouvez redimensionner le volet Cloud Shell en faisant glisser la barre de séparation en haut du volet. Vous pouvez aussi utiliser les icônes , et X situées en haut à droite du volet pour réduire, agrandir et fermer le volet. Pour plus d’informations sur l’utilisation d’Azure Cloud Shell, consultez la documentation Azure Cloud Shell.

  4. Dans le volet PowerShell, entrez les commandes suivantes pour cloner ce référentiel :

     rm -r mslearn-databricks -f
     git clone https://github.com/MicrosoftLearning/mslearn-databricks
    
  5. Une fois le référentiel cloné, entrez la commande suivante pour exécuter le script setup.ps1, qui approvisionne un espace de travail Azure Databricks dans une région disponible :

     ./mslearn-databricks/setup.ps1
    
  6. Si vous y êtes invité, choisissez l’abonnement à utiliser (uniquement si vous avez accès à plusieurs abonnements Azure).

  7. Attendez que le script se termine. Cela prend généralement environ 5 minutes, mais dans certains cas, cela peut prendre plus de temps. Pendant que vous attendez, consultez l’article Présentation de Delta Lake dans la documentation Azure Databricks.

Créer un cluster

Azure Databricks est une plateforme de traitement distribuée qui utilise des clusters Apache Spark pour traiter des données en parallèle sur plusieurs nœuds. Chaque cluster se compose d’un nœud de pilote pour coordonner le travail et les nœuds Worker pour effectuer des tâches de traitement. Dans cet exercice, vous allez créer un cluster à nœud unique pour réduire les ressources de calcul utilisées dans l’environnement du labo (dans lequel les ressources peuvent être limitées). Dans un environnement de production, vous créez généralement un cluster avec plusieurs nœuds Worker.

Conseil : Si vous disposez déjà d’un cluster avec une version 13.3 LTS ou ultérieure du runtime dans votre espace de travail Azure Databricks, vous pouvez l’utiliser pour effectuer cet exercice et ignorer cette procédure.

  1. Dans le Portail Microsoft Azure, accédez au groupe de ressources msl-xxxxxxx créé par le script (ou le groupe de ressources contenant votre espace de travail Azure Databricks existant)

  2. Sélectionnez votre ressource de service Azure Databricks (nommée databricks-xxxxxxx si vous avez utilisé le script d’installation pour la créer).

  3. Dans la page Vue d’ensemble de votre espace de travail, utilisez le bouton Lancer l’espace de travail pour ouvrir votre espace de travail Azure Databricks dans un nouvel onglet de navigateur et connectez-vous si vous y êtes invité.

    Conseil : lorsque vous utilisez le portail de l’espace de travail Databricks, plusieurs conseils et notifications peuvent s’afficher. Ignorez-les et suivez les instructions fournies pour effectuer les tâches de cet exercice.

  4. Dans la barre latérale située à gauche, sélectionnez la tâche (+) Nouveau, puis sélectionnez Cluster.

  5. Dans la page Nouveau cluster, créez un cluster avec les paramètres suivants :
    • Nom du cluster : cluster de nom d’utilisateur (nom de cluster par défaut)
    • Stratégie : Non restreint
    • Mode cluster : nœud unique
    • Mode d’accès : un seul utilisateur (avec votre compte d’utilisateur sélectionné)
    • Version du runtime Databricks : 13.3 LTS (Spark 3.4.1, Scala 2.12) ou version ultérieure
    • Utiliser l’accélération photon : sélectionné
    • Type de nœud : Standard_D4ds_v5
    • Arrêter après 20 minutes d’inactivité
  6. Attendez que le cluster soit créé. Cette opération peut prendre une à deux minutes.

    Remarque : si votre cluster ne démarre pas, le quota de votre abonnement est peut-être insuffisant dans la région où votre espace de travail Azure Databricks est approvisionné. Pour plus d’informations, consultez l’article La limite de cœurs du processeur empêche la création du cluster. Si cela se produit, vous pouvez essayer de supprimer votre espace de travail et d’en créer un dans une autre région. Vous pouvez spécifier une région comme paramètre pour le script d’installation comme suit : ./mslearn-databricks/setup.ps1 eastus

Créer un notebook et ingérer des données

Vous pouvez créer des notebooks dans votre espace de travail Azure Databricks pour exécuter du code écrit dans divers langages de programmation. Dans cet exercice, vous allez créer un notebook simple qui ingère des données à partir d’un fichier et les enregistre dans un dossier du système de fichiers Databricks (DBFS).

  1. Affichez le portail de l’espace de travail Azure Databricks et notez que la barre latérale gauche contient des icônes indiquant les différentes tâches que vous pouvez effectuer.

  2. Dans la barre latérale, cliquez sur le lien (+) Nouveau pour créer un notebook.

  3. Modifiez le nom du notebook par défaut (Notebook sans titre [date]) en IngestionEnTempsRéel.

  4. Dans la première cellule du notebook, entrez le code suivant, qui utilise des commandes du shell pour télécharger des fichiers de données depuis GitHub dans le système de fichiers utilisé par votre cluster.

     %sh
     rm -r /dbfs/device_stream
     mkdir /dbfs/device_stream
     wget -O /dbfs/device_stream/devices1.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices1.json
    
  5. Utilisez l’option de menu ▸ Exécuter la cellule à gauche de la cellule pour l’exécuter. Attendez ensuite que le travail Spark s’exécute par le code.

Utiliser des tables delta pour les données de streaming

Delta Lake prend en charge les données de diffusion en continu. Les tables delta peuvent être un récepteur ou une source pour des flux de données créés en utilisant l’API Spark Structured Streaming. Dans cet exemple, vous allez utiliser une table delta comme récepteur pour des données de streaming dans un scénario IoT (Internet des objets) simulé. Les données d’appareil simulé sont au format JSON, comme suit :

{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
  1. Dans une nouvelle cellule, exécutez le code suivant pour créer un flux en fonction du dossier contenant les données de l’appareil JSON :

    from pyspark.sql.types import *
    from pyspark.sql.functions import *
       
    # Create a stream that reads data from the folder, using a JSON schema
    inputPath = '/device_stream/'
    jsonSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
    ])
    iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
    print("Source stream created...")
    
  2. Ajoutez une nouvelle cellule de code et utilisez-la pour écrire perpétuellement le flux de données dans un dossier delta :

    # Write the stream to a delta table
    delta_stream_table_path = '/delta/iotdevicedata'
    checkpointpath = '/delta/checkpoint'
    deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
    print("Streaming to delta sink...")
    
  3. Ajoutez du code pour lire les données, comme n’importe quel autre dossier delta :

    # Read the data in delta format into a dataframe
    df = spark.read.format("delta").load(delta_stream_table_path)
    display(df)
    
  4. Ajoutez le code suivant pour créer une table basée sur le dossier delta dans lequel les données de diffusion en continu sont écrites :

    # create a catalog table based on the streaming sink
    spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))
    
  5. Utilisez le code suivant pour interroger la table :

    %sql
    SELECT * FROM IotDeviceData;
    
  6. Exécutez le code suivant pour ajouter des données d’appareil fraîches au flux :

     %sh
     wget -O /dbfs/device_stream/devices2.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices2.json
    
  7. Réexécutez le code de requête SQL suivant pour vérifier que les nouvelles données ont été ajoutées au flux et écrites dans le dossier delta :

    %sql
    SELECT * FROM IotDeviceData;
    
  8. Exécutez le code suivant pour entraîner un modèle :

    deltastream.stop()
    

Nettoyage

Dans le portail Azure Databricks, sur la page Calcul, sélectionnez votre cluster et sélectionnez ■ Arrêter pour l’arrêter.

Si vous avez terminé d’explorer Azure Databricks, vous pouvez supprimer les ressources que vous avez créées pour éviter les coûts Azure inutiles et libérer de la capacité dans votre abonnement.