Azure Databricks を使用した Spark 構造化ストリーミングと Delta Lake を使用したリアルタイムのインジェストと処理

Spark Structured Streaming を使用すると、エンドツーエンドのフォールト トレランスを使用してリアルタイムでデータを処理できます。 Delta Lakeは、ACIDトランザクションを提供するストレージ層を追加することでこれを強化し、データの整合性と一貫性を確保します。 クラウド ストレージから Delta Lake にデータを取り込み、Delta Live Tables を使用してストリーミング データ パイプラインを管理および最適化できます。

このラボは完了するまで、約 30 分かかります。

Azure Databricks ワークスペースをプロビジョニングする

ヒント: 既に Azure Databricks ワークスペースがある場合は、この手順をスキップして、既存のワークスペースを使用できます。

この演習には、新しい Azure Databricks ワークスペースをプロビジョニングするスクリプトが含まれています。 このスクリプトは、この演習で必要なコンピューティング コアに対する十分なクォータが Azure サブスクリプションにあるリージョンに、Premium レベルの Azure Databricks ワークスペース リソースを作成しようとします。また、使用するユーザー アカウントのサブスクリプションに、Azure Databricks ワークスペース リソースを作成するための十分なアクセス許可があることを前提としています。 十分なクォータやアクセス許可がないためにスクリプトが失敗した場合は、Azure portal で、Azure Databricks ワークスペースを対話形式で作成してみてください。

  1. Web ブラウザーで、https://portal.azure.comAzure portal にサインインします。

  2. ページ上部の検索バーの右側にある [>_] ボタンを使用して、Azure portal に新しい Cloud Shell を作成します。メッセージが表示されたら、PowerShell 環境を選んで、ストレージを作成します。 次に示すように、Azure portal の下部にあるペインに、Cloud Shell のコマンド ライン インターフェイスが表示されます。

    Azure portal と Cloud Shell のペイン

    : 前に Bash 環境を使ってクラウド シェルを作成している場合は、そのクラウド シェル ペインの左上にあるドロップダウン メニューを使って、PowerShell に変更します。

  3. ペインの上部にある区分線をドラッグして Cloud Shell のサイズを変更したり、ペインの右上にある X アイコンを使用して、ペインを最小化または最大化したり、閉じたりすることができます。 Azure Cloud Shell の使い方について詳しくは、Azure Cloud Shell のドキュメントをご覧ください。

  4. PowerShell のペインで、次のコマンドを入力して、リポジトリを複製します。

     rm -r mslearn-databricks -f
     git clone https://github.com/MicrosoftLearning/mslearn-databricks
    
  5. リポジトリをクローンした後、次のコマンドを入力して setup.ps1 スクリプトを実行します。これにより、使用可能なリージョンに Azure Databricks ワークスペースがプロビジョニングされます。

     ./mslearn-databricks/setup.ps1
    
  6. メッセージが表示された場合は、使用するサブスクリプションを選択します (これは、複数の Azure サブスクリプションへのアクセス権を持っている場合にのみ行います)。

  7. スクリプトが完了するまで待ちます。通常、約 5 分かかりますが、さらに時間がかかる場合もあります。 待っている間に、Azure Databricks ドキュメントのDelta Lake の概要に関する記事をご確認ください。

クラスターの作成

Azure Databricks は、Apache Spark “クラスター” を使用して複数のノードでデータを並列に処理する分散処理プラットフォームです。** 各クラスターは、作業を調整するドライバー ノードと、処理タスクを実行するワーカー ノードで構成されています。 この演習では、ラボ環境で使用されるコンピューティング リソース (リソースが制約される場合がある) を最小限に抑えるために、単一ノード クラスターを作成します。 運用環境では、通常、複数のワーカー ノードを含むクラスターを作成します。

ヒント: Azure Databricks ワークスペースに 13.3 LTS 以降のランタイム バージョンを持つクラスターが既にある場合は、それを使ってこの演習を完了し、この手順をスキップできます。

  1. Azure portal で、スクリプトによって作成された msl-xxxxxxx リソース グループ (または既存の Azure Databricks ワークスペースを含むリソース グループ) に移動します

  2. Azure Databricks Service リソース (セットアップ スクリプトを使って作成した場合は、databricks-xxxxxxx という名前) を選択します。

  3. Azure Databricks ワークスペースの [概要] ページで、[ワークスペースの起動] ボタンを使用して、新しいブラウザー タブで Azure Databricks ワークスペースを開きます。サインインを求められた場合はサインインします。

    ヒント: Databricks ワークスペース ポータルを使用すると、さまざまなヒントと通知が表示される場合があります。 これらは無視し、指示に従ってこの演習のタスクを完了してください。

  4. 左側のサイドバーで、[(+) 新規] タスクを選択し、[クラスター] を選択します。

  5. [新しいクラスター] ページで、次の設定を使用して新しいクラスターを作成します。
    • クラスター名: “ユーザー名の” クラスター (既定のクラスター名)**
    • ポリシー:Unrestricted
    • クラスター モード: 単一ノード
    • アクセス モード: 単一ユーザー (自分のユーザー アカウントを選択)
    • Databricks Runtime のバージョン: 13.3 LTS (Spark 3.4.1、Scala 2.12) 以降
    • Photon Acceleration を使用する: 選択済み
    • ノード タイプ: Standard_D4ds_v5
    • 非アクティブ状態が ** 20 ** 分間続いた後終了する
  6. クラスターが作成されるまで待ちます。 これには 1、2 分かかることがあります。

    : クラスターの起動に失敗した場合、Azure Databricks ワークスペースがプロビジョニングされているリージョンでサブスクリプションのクォータが不足していることがあります。 詳細については、「CPU コアの制限によってクラスターを作成できない」を参照してください。 その場合は、ワークスペースを削除し、別のリージョンに新しいワークスペースを作成してみてください。 次のように、セットアップ スクリプトのパラメーターとしてリージョンを指定できます: ./mslearn-databricks/setup.ps1 eastus

ノートブックを作成してデータを取り込む

Azure Databricks ワークスペースにノートブックを作成して、さまざまなプログラミング言語で記述されたコードを実行できます。 この演習では、ファイルからデータを取り込んで Databricks File System (DBFS) のフォルダーに保存する簡単なノートブックを作成します。

  1. 次に、Azure Databricks ワークスペース ポータルを表示し、左側のサイド バーに、実行できるさまざまなタスクのアイコンが含まれていることに注目します。

  2. サイド バーで [(+) 新規] タスクを使用して、Notebook を作成します。

  3. 既定のノートブック名 (無題のノートブック [日付]) を RealTimeIngestion に変更します。

  4. ノートブックの最初のセルに次のコードを入力します。このコードは、”シェル” コマンドを使用して、GitHub からクラスターで使用されるファイル システムにデータ ファイルをダウンロードします。**

     %sh
     rm -r /dbfs/device_stream
     mkdir /dbfs/device_stream
     wget -O /dbfs/device_stream/devices1.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices1.json
    
  5. セルの左側にある [▸ セルの実行] メニュー オプションを使用して実行を行います。 そして、コードによって実行される Spark ジョブが完了するまで待ちます。

ストリーミング データにデルタ テーブルを使用する

Delta Lake では、”ストリーミング” データがサポートされています。 デルタ テーブルは、Spark 構造化ストリーミング API を使用して作成されたデータ ストリームの “シンク” または “ソース” に指定できます。** ** この例では、モノのインターネット (IoT) のシミュレーション シナリオで、一部のストリーミング データのシンクにデルタ テーブルを使用します。 シミュレートされたデバイス データは、次のような JSON 形式です。

{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
  1. 新しいセルで、次のコードを実行して、JSON デバイス データを含むフォルダーに基づいてストリームを作成します。

    from pyspark.sql.types import *
    from pyspark.sql.functions import *
       
    # Create a stream that reads data from the folder, using a JSON schema
    inputPath = '/device_stream/'
    jsonSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
    ])
    iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
    print("Source stream created...")
    
  2. 新しいコード セルを追加し、それを使用して、データのストリームをデルタ フォルダーに永続的に書き込みます。

    # Write the stream to a delta table
    delta_stream_table_path = '/delta/iotdevicedata'
    checkpointpath = '/delta/checkpoint'
    deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
    print("Streaming to delta sink...")
    
  3. 他の差分フォルダーと同様に、データを読み取るコードを追加します。

    # Read the data in delta format into a dataframe
    df = spark.read.format("delta").load(delta_stream_table_path)
    display(df)
    
  4. 次のコードを追加して、ストリーミング データの書き込み先となる差分フォルダーに基づいてテーブルを作成します。

    # create a catalog table based on the streaming sink
    spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))
    
  5. 次のコードを使用してテーブルにクエリを実行します。

    %sql
    SELECT * FROM IotDeviceData;
    
  6. 次のコードを実行して、新しいデバイス データをストリームに追加します。

     %sh
     wget -O /dbfs/device_stream/devices2.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices2.json
    
  7. 次の SQL クエリ コードを再実行して、新しいデータがストリームに追加され、差分フォルダーに書き込まれたことを確認します。

    %sql
    SELECT * FROM IotDeviceData;
    
  8. 次のコードを実行してストリームを停止します。

    deltastream.stop()
    

クリーンアップ

Azure Databricks ポータルの [コンピューティング] ページでクラスターを選択し、[■ 終了] を選択してクラスターをシャットダウンします。

Azure Databricks を調べ終わったら、作成したリソースを削除できます。これにより、不要な Azure コストが生じないようになり、サブスクリプションの容量も解放されます。