Delta Live Tables を使用してデータ パイプラインを作成する

Delta Live Tables は、信頼性、保守性、テスト可能性に優れたデータ処理パイプラインを構築するための宣言型フレームワークです。 パイプラインは、Delta Live Tables でデータ処理ワークフローの構成と実行に使用されるメイン ユニットです。 Python または SQL で宣言された有向非巡回グラフ (DAG) を使用して、データ ソースをターゲット データセットにリンクします。

このラボは完了するまで、約 40 分かかります。

: Azure Databricks ユーザー インターフェイスは継続的な改善の対象となります。 この演習の手順が記述されてから、ユーザー インターフェイスが変更されている場合があります。

Azure Databricks ワークスペースをプロビジョニングする

ヒント: 既に Azure Databricks ワークスペースがある場合は、この手順をスキップして、既存のワークスペースを使用できます。

この演習には、新しい Azure Databricks ワークスペースをプロビジョニングするスクリプトが含まれています。 このスクリプトは、この演習で必要なコンピューティング コアに対する十分なクォータが Azure サブスクリプションにあるリージョンに、Premium レベルの Azure Databricks ワークスペース リソースを作成しようとします。また、使用するユーザー アカウントのサブスクリプションに、Azure Databricks ワークスペース リソースを作成するための十分なアクセス許可があることを前提としています。 十分なクォータやアクセス許可がないためにスクリプトが失敗した場合は、Azure portal で、Azure Databricks ワークスペースを対話形式で作成してみてください。

  1. Web ブラウザーで、https://portal.azure.comAzure portal にサインインします。
  2. ページ上部の検索バーの右側にある [>_] ボタンを使用して、Azure portal に新しい Cloud Shell を作成します。PowerShell 環境を選択します。 次に示すように、Azure portal の下部にあるペインに、Cloud Shell のコマンド ライン インターフェイスが表示されます。

    Azure portal と Cloud Shell のペイン

    : Bash 環境を使用するクラウド シェルを以前に作成した場合は、それを PowerShell に切り替えます。

  3. ペインの上部にある区分線をドラッグして Cloud Shell のサイズを変更したり、ペインの右上にある X アイコンを使用して、ペインを最小化または最大化したり、閉じたりすることができます。 Azure Cloud Shell の使い方について詳しくは、Azure Cloud Shell のドキュメントをご覧ください。

  4. PowerShell のペインで、次のコマンドを入力して、リポジトリを複製します。

     rm -r mslearn-databricks -f
     git clone https://github.com/MicrosoftLearning/mslearn-databricks
    
  5. リポジトリをクローンした後、次のコマンドを入力して setup.ps1 スクリプトを実行します。これにより、使用可能なリージョンに Azure Databricks ワークスペースがプロビジョニングされます。

     ./mslearn-databricks/setup.ps1
    
  6. メッセージが表示された場合は、使用するサブスクリプションを選択します (これは、複数の Azure サブスクリプションへのアクセス権を持っている場合にのみ行います)。

  7. スクリプトの完了まで待ちます。通常、約 5 分かかりますが、さらに時間がかかる場合もあります。 待っている間、Azure Databricks ドキュメントの記事「Databricks SQL とは」を確認してください。

クラスターの作成

Azure Databricks は、Apache Spark “クラスター” を使用して複数のノードでデータを並列に処理する分散処理プラットフォームです。** 各クラスターは、作業を調整するドライバー ノードと、処理タスクを実行するワーカー ノードで構成されています。 この演習では、ラボ環境で使用されるコンピューティング リソース (リソースが制約される場合がある) を最小限に抑えるために、単一ノード クラスターを作成します。 運用環境では、通常、複数のワーカー ノードを含むクラスターを作成します。

ヒント: Azure Databricks ワークスペースに 13.3 LTS 以降のランタイム バージョンを持つクラスターが既にある場合は、それを使ってこの演習を完了し、この手順をスキップできます。

  1. Azure portal で、スクリプトによって作成された msl-xxxxxxx リソース グループ (または既存の Azure Databricks ワークスペースを含むリソース グループ) に移動します

  2. Azure Databricks Service リソース (セットアップ スクリプトを使って作成した場合は、databricks-xxxxxxx という名前) を選択します。

  3. Azure Databricks ワークスペースの [概要] ページで、[ワークスペースの起動] ボタンを使用して、新しいブラウザー タブで Azure Databricks ワークスペースを開きます。サインインを求められた場合はサインインします。

    ヒント: Databricks ワークスペース ポータルを使用すると、さまざまなヒントと通知が表示される場合があります。 これらは無視し、指示に従ってこの演習のタスクを完了してください。

  4. 左側のサイドバーで、[(+) 新規] タスクを選択し、[クラスター] を選択します ([その他] サブメニューを確認する必要がある場合があります)。

  5. [新しいクラスター] ページで、次の設定を使用して新しいクラスターを作成します。
    • クラスター名: “ユーザー名の” クラスター (既定のクラスター名)**
    • ポリシー:Unrestricted
    • クラスター モード: 単一ノード
    • アクセス モード: 単一ユーザー (自分のユーザー アカウントを選択)
    • Databricks Runtime のバージョン: 13.3 LTS (Spark 3.4.1、Scala 2.12) 以降
    • Photon Acceleration を使用する: 選択済み
    • ノード タイプ: Standard_D4ds_v5
    • 非アクティブ状態が ** 20 ** 分間続いた後終了する
  6. クラスターが作成されるまで待ちます。 これには 1、2 分かかることがあります。

    : クラスターの起動に失敗した場合、Azure Databricks ワークスペースがプロビジョニングされているリージョンでサブスクリプションのクォータが不足していることがあります。 詳細については、「CPU コアの制限によってクラスターを作成できない」を参照してください。 その場合は、ワークスペースを削除し、別のリージョンに新しいワークスペースを作成してみてください。 次のように、セットアップ スクリプトのパラメーターとしてリージョンを指定できます: ./mslearn-databricks/setup.ps1 eastus

ノートブックを作成してデータを取り込む

  1. サイド バーで [(+) 新規] タスクを使用して、Notebook を作成します。

  2. 既定のノートブック名 (無題のノートブック [日付]) を「Create a pipeline with Delta Live tables」に変更し、[接続] ドロップダウン リストでクラスターを選択します (まだ選択されていない場合)。 クラスターが実行されていない場合は、起動に 1 分ほどかかる場合があります。

  3. ノートブックの最初のセルに次のコードを入力します。このコードは、”シェル” コマンドを使用して、GitHub からクラスターで使用されるファイル システムにデータ ファイルをダウンロードします。**

     %sh
     rm -r /dbfs/delta_lab
     mkdir /dbfs/delta_lab
     wget -O /dbfs/delta_lab/covid_data.csv https://github.com/MicrosoftLearning/mslearn-databricks/raw/main/data/covid_data.csv
    
  4. セルの左側にある [▸ セルの実行] メニュー オプションを使用して実行を行います。 そして、コードによって実行される Spark ジョブが完了するまで待ちます。

SQL を使用して Delta Live Tables パイプラインを作成する

  1. 新しいノートブックを作成し、名前を[Pipeline Notebook]に変更します。

  2. ノートブックの名前の横にある Python を選択し、既定の言語を SQL に変更します。

  3. 最初のセルに次のコードを実行せずに配置します。 すべてのセルは、パイプラインの作成後に実行されます。 このコードでは、以前ダウンロードした生データによって設定される Delta Live Table を定義します。

     CREATE OR REFRESH LIVE TABLE raw_covid_data
     COMMENT "COVID sample dataset. This data was ingested from the COVID-19 Data Repository by the Center for Systems Science and Engineering (CSSE) at Johns Hopkins University."
     AS
     SELECT
       Last_Update,
       Country_Region,
       Confirmed,
       Deaths,
       Recovered
     FROM read_files('dbfs:/delta_lab/covid_data.csv', format => 'csv', header => true)
    
  4. 最初のセルの下で、+ コード アイコンを使用して新しいセルを追加し、分析前に前のテーブルのデータのクエリ、フィルター処理、およびフォーマットを行う次のコードを入力します。

     CREATE OR REFRESH LIVE TABLE processed_covid_data(
       CONSTRAINT valid_country_region EXPECT (Country_Region IS NOT NULL) ON VIOLATION FAIL UPDATE
     )
     COMMENT "Formatted and filtered data for analysis."
     AS
     SELECT
         TO_DATE(Last_Update, 'MM/dd/yyyy') as Report_Date,
         Country_Region,
         Confirmed,
         Deaths,
         Recovered
     FROM live.raw_covid_data;
    
  5. 3 番目の新しいコード セルに次のコードを入力します。これにより、パイプラインが正常に実行された後にさらに分析するための強化されたデータ ビューが作成されます。

     CREATE OR REFRESH LIVE TABLE aggregated_covid_data
     COMMENT "Aggregated daily data for the US with total counts."
     AS
     SELECT
         Report_Date,
         sum(Confirmed) as Total_Confirmed,
         sum(Deaths) as Total_Deaths,
         sum(Recovered) as Total_Recovered
     FROM live.processed_covid_data
     GROUP BY Report_Date;
    
  6. 左側のサイドバーで Delta Live Tables を選択し、 [パイプラインの作成] を選択します。

  7. [パイプラインの作成] ページで、次の設定を使用して新しいパイプラインを作成します。
    • パイプライン名: Covid Pipeline
    • 製品エディション: 詳細
    • パイプライン モード: トリガー
    • ソース コードフォルダー内のパイプライン ノートブックノートブックを参照しますUsers/user@name
    • ストレージ オプション: Hive メタストア
    • 保存場所: dbfs:/pipelines/delta_lab
    • ターゲット スキーマ: 入力するdefault
  8. [作成] を選択して、[開始] を選択します。 次に、パイプラインが実行されるまで待機します (時間がかかる場合があります)。

  9. パイプラインが正常に実行された後、最近作成した [Delta Live Tables を使用してパイプラインを作成] ノートブックに戻り、新しいセルで以下のコードを実行して、3つの新しいテーブルのファイルが指定された保存場所に作成されたことを確認します:

     display(dbutils.fs.ls("dbfs:/pipelines/delta_lab/tables"))
    
  10. 別のコード セルを追加し、次のコードを実行して、テーブルが デフォルト データベースに作成されていることを確認します。

     %sql
    
     SHOW TABLES
    

結果を視覚化として表示する

テーブルを作成した後、テーブルをデータフレームに読み込み、データを視覚化することができます。

  1. [Delta Live Tables を使用してパイプラインを作成] ノートブックで、新しいコード セルを追加し、次のコードを実行して aggregated_covid_data をデータフレームに読み込みます。

     %sql
        
     SELECT * FROM aggregated_covid_data
    
  2. 結果の表の上にある [+][視覚化] の順に選択して視覚化エディターを表示し、次のオプションを適用します。
    • 視覚化の種類: 折れ線
    • X 列: Report_Date
    • Y 列: 新しい列を追加して[Total_Confirmed] を選択します。 **合計集計を適用します
  3. 視覚化を保存し、結果のグラフをノートブックに表示します。

クリーンアップ

Azure Databricks ポータルの [コンピューティング] ページでクラスターを選択し、[■ 終了] を選択してクラスターをシャットダウンします。

Azure Databricks を調べ終わったら、作成したリソースを削除できます。これにより、不要な Azure コストが生じないようになり、サブスクリプションの容量も解放されます。