Delta Live Tables を使用してデータ パイプラインを作成する

Delta Live Tables は、信頼性、保守性、テスト可能性に優れたデータ処理パイプラインを構築するための宣言型フレームワークです。 パイプラインは、Delta Live Tables でデータ処理ワークフローの構成と実行に使用されるメイン ユニットです。 Python または SQL で宣言された有向非巡回グラフ (DAG) を使用して、データ ソースをターゲット データセットにリンクします。

このラボは完了するまで、約 40 分かかります。

Azure Databricks ワークスペースをプロビジョニングする

ヒント: 既に Azure Databricks ワークスペースがある場合は、この手順をスキップして、既存のワークスペースを使用できます。

この演習には、新しい Azure Databricks ワークスペースをプロビジョニングするスクリプトが含まれています。 このスクリプトは、この演習で必要なコンピューティング コアに対する十分なクォータが Azure サブスクリプションにあるリージョンに、Premium レベルの Azure Databricks ワークスペース リソースを作成しようとします。また、使用するユーザー アカウントのサブスクリプションに、Azure Databricks ワークスペース リソースを作成するための十分なアクセス許可があることを前提としています。 十分なクォータやアクセス許可がないためにスクリプトが失敗した場合は、Azure portal で、Azure Databricks ワークスペースを対話形式で作成してみてください。

  1. Web ブラウザーで、https://portal.azure.comAzure portal にサインインします。

  2. ページ上部の検索バーの右側にある [>_] ボタンを使用して、Azure portal に新しい Cloud Shell を作成します。メッセージが表示されたら、PowerShell 環境を選んで、ストレージを作成します。 次に示すように、Azure portal の下部にあるペインに、Cloud Shell のコマンド ライン インターフェイスが表示されます。

    Azure portal と Cloud Shell のペイン

    :前に Bash 環境を使ってクラウド シェルを作成した場合は、そのクラウド シェル ペインの左上にあるドロップダウン メニューを使って、PowerShell に変更します。

  3. ペインの上部にある区分線をドラッグして Cloud Shell のサイズを変更したり、ペインの右上にある X アイコンを使用して、ペインを最小化または最大化したり、閉じたりすることができます。 Azure Cloud Shell の使い方について詳しくは、Azure Cloud Shell のドキュメントをご覧ください。

  4. PowerShell のペインで、次のコマンドを入力して、リポジトリを複製します。

     rm -r mslearn-databricks -f
     git clone https://github.com/MicrosoftLearning/mslearn-databricks
    
  5. リポジトリをクローンした後、次のコマンドを入力して setup.ps1 スクリプトを実行します。これにより、使用可能なリージョンに Azure Databricks ワークスペースがプロビジョニングされます。

     ./mslearn-databricks/setup.ps1
    
  6. メッセージが表示された場合は、使用するサブスクリプションを選択します (これは、複数の Azure サブスクリプションへのアクセス権を持っている場合にのみ行います)。

  7. スクリプトが完了するまで待ちます。通常、約 5 分かかりますが、さらに時間がかかる場合もあります。 待っている間に、Azure Databricks ドキュメントのDelta Lake の概要に関する記事をご確認ください。

クラスターの作成

Azure Databricks は、Apache Spark “クラスター” を使用して複数のノードでデータを並列に処理する分散処理プラットフォームです。** 各クラスターは、作業を調整するドライバー ノードと、処理タスクを実行するワーカー ノードで構成されています。 この演習では、ラボ環境で使用されるコンピューティング リソース (リソースが制約される場合がある) を最小限に抑えるために、単一ノード クラスターを作成します。 運用環境では、通常、複数のワーカー ノードを含むクラスターを作成します。

ヒント: Azure Databricks ワークスペースに 13.3 LTS 以降のランタイム バージョンを持つクラスターが既にある場合は、それを使ってこの演習を完了し、この手順をスキップできます。

  1. Azure portal で、スクリプトによって作成された msl-xxxxxxx リソース グループ (または既存の Azure Databricks ワークスペースを含むリソース グループ) に移動します

  2. Azure Databricks Service リソース (セットアップ スクリプトを使って作成した場合は、databricks-xxxxxxx という名前) を選択します。

  3. Azure Databricks ワークスペースの [概要] ページで、[ワークスペースの起動] ボタンを使用して、新しいブラウザー タブで Azure Databricks ワークスペースを開きます。サインインを求められた場合はサインインします。

    ヒント: Databricks ワークスペース ポータルを使用すると、さまざまなヒントと通知が表示される場合があります。 これらは無視し、指示に従ってこの演習のタスクを完了してください。

  4. 左側のサイドバーで、[(+) 新規] タスクを選択し、[クラスター] を選択します。

  5. [新しいクラスター] ページで、次の設定を使用して新しいクラスターを作成します。
    • クラスター名: “ユーザー名の” クラスター (既定のクラスター名)**
    • ポリシー:Unrestricted
    • クラスター モード: 単一ノード
    • アクセス モード: 単一ユーザー (自分のユーザー アカウントを選択)
    • Databricks Runtime のバージョン: 13.3 LTS (Spark 3.4.1、Scala 2.12) 以降
    • Photon Acceleration を使用する: 選択済み
    • ノード タイプ: Standard_D4ds_v5
    • 非アクティブ状態が ** 20 ** 分間続いた後終了する
  6. クラスターが作成されるまで待ちます。 これには 1、2 分かかることがあります。

    : クラスターの起動に失敗した場合、Azure Databricks ワークスペースがプロビジョニングされているリージョンでサブスクリプションのクォータが不足していることがあります。 詳細については、「CPU コアの制限によってクラスターを作成できない」を参照してください。 その場合は、ワークスペースを削除し、別のリージョンに新しいワークスペースを作成してみてください。 次のように、セットアップ スクリプトのパラメーターとしてリージョンを指定できます: ./mslearn-databricks/setup.ps1 eastus

ノートブックを作成してデータを取り込む

  1. サイド バーで [(+) 新規] タスクを使用して、Notebook を作成します。

  2. 既定のノートブック名 (無題のノートブック [日付]) を「Create a pipeline with Delta Live tables」に変更し、[接続] ドロップダウン リストでクラスターを選択します (まだ選択されていない場合)。 クラスターが実行されていない場合は、起動に 1 分ほどかかる場合があります。

  3. ノートブックの最初のセルに次のコードを入力します。このコードは、”シェル” コマンドを使用して、GitHub からクラスターで使用されるファイル システムにデータ ファイルをダウンロードします。**

     %sh
     rm -r /dbfs/delta_lab
     mkdir /dbfs/delta_lab
     wget -O /dbfs/delta_lab/covid_data.csv https://github.com/MicrosoftLearning/mslearn-databricks/raw/main/data/covid_data.csv
    
  4. セルの左側にある [▸ セルの実行] メニュー オプションを使用して実行を行います。 そして、コードによって実行される Spark ジョブが完了するまで待ちます。

SQL を使用して Delta Live Tables パイプラインを作成する

新しいノートブックを作成し、SQL スクリプトを使用して Delta Live Tables の定義を開始します。

  1. ノートブックの名前の横にある Python を選択し、既定の言語を SQL に変更します。

  2. 最初のセルに次のコードを実行せずに配置します。 すべてのセルは、パイプラインの作成後に実行されます。 このコードでは、以前ダウンロードした生データによって設定される Delta Live Table を定義します。

     CREATE OR REFRESH LIVE TABLE raw_covid_data
     COMMENT "COVID sample dataset. This data was ingested from the COVID-19 Data Repository by the Center for Systems Science and Engineering (CSSE) at Johns Hopkins University."
     AS
     SELECT
       Last_Update,
       Country_Region,
       Confirmed,
       Deaths,
       Recovered
     FROM read_files('dbfs:/delta_lab/covid_data.csv', format => 'csv', header => true)
    
  3. 新しいセルを追加し、次のコードを使用して、分析前に前のテーブルのデータのクエリの実行、フィルター処理、書式設定を行います。

     CREATE OR REFRESH LIVE TABLE processed_covid_data(
       CONSTRAINT valid_country_region EXPECT (Country_Region IS NOT NULL) ON VIOLATION FAIL UPDATE
     )
     COMMENT "Formatted and filtered data for analysis."
     AS
     SELECT
         TO_DATE(Last_Update, 'MM/dd/yyyy') as Report_Date,
         Country_Region,
         Confirmed,
         Deaths,
         Recovered
     FROM live.raw_covid_data;
    
  4. 新しいコード セルに、パイプラインが正常に実行されると、さらに分析するためにエンリッチされたデータ ビューを作成する次のコードを配置します。

     CREATE OR REFRESH LIVE TABLE aggregated_covid_data
     COMMENT "Aggregated daily data for the US with total counts."
     AS
     SELECT
         Report_Date,
         sum(Confirmed) as Total_Confirmed,
         sum(Deaths) as Total_Deaths,
         sum(Recovered) as Total_Recovered
     FROM live.processed_covid_data
     GROUP BY Report_Date;
    
  5. 左サイドバーで [Delta Live Tables] を選択し、[パイプラインの作成] を選択します。

  6. [パイプラインの作成] ページで、次の設定を使用して新しいパイプラインを作成します。
    • パイプライン名: パイプラインに名前を付ける
    • 製品エディション: 詳細
    • パイプライン モード: トリガー
    • ソース コード: SQL ノートブックを選択する
    • ストレージ オプション: Hive メタストア
    • ストレージの場所: dbfs:/pipelines/delta_lab
  7. [作成] を選択して、[開始] を選択します。

  8. パイプラインが正常に実行されたら、最初のノートブックに戻り、次のコードを使用して、指定したストレージの場所に 3 つの新しいテーブルがすべて作成されていることを確認します。

     display(dbutils.fs.ls("dbfs:/pipelines/delta_lab/tables"))
    

結果を視覚化として表示する

テーブルを作成した後、テーブルをデータフレームに読み込み、データを視覚化することができます。

  1. 最初のノートブックで、新しいコード セルを追加し、次のコードを実行して aggregated_covid_data をデータフレームに読み込みます。

    df = spark.read.format("delta").load('/pipelines/delta_lab/tables/aggregated_covid_data')
    display(df)
    
  2. 結果の表の上にある [+][視覚化] の順に選択して視覚化エディターを表示し、次のオプションを適用します。
    • 視覚化の種類: 折れ線
    • X 列: Report_Date
    • Y 列: 新しい列を追加して[Total_Confirmed] を選択します。 **合計集計を適用します
  3. 視覚化を保存し、結果のグラフをノートブックに表示します。

クリーンアップ

Azure Databricks ポータルの [コンピューティング] ページでクラスターを選択し、[■ 終了] を選択してクラスターをシャットダウンします。

Azure Databricks を調べ終わったら、作成したリソースを削除できます。これにより、不要な Azure コストが生じないようになり、サブスクリプションの容量も解放されます。