Azure Databricks を使用したデータ インジェストと処理の自動化
Databricks ジョブは、データ インジェストと処理ワークフローの自動化を可能にする強力なサービスです。 これにより、複雑なデータ パイプラインのオーケストレーションが可能になります。これには、さまざまなソースからの生データの取り込み、Delta Live Tables を使用したこのデータの変換、さらに分析のために Delta Lake への永続化などのタスクが含まれます。 Azure Databricks を使用すると、ユーザーはデータ処理タスクを自動的にスケジュールして実行できるため、データが常に最新であり、意思決定プロセスで使用できるようになります。
このラボは完了するまで、約 20 分かかります。
Azure Databricks ワークスペースをプロビジョニングする
ヒント: 既に Azure Databricks ワークスペースがある場合は、この手順をスキップして、既存のワークスペースを使用できます。
この演習には、新しい Azure Databricks ワークスペースをプロビジョニングするスクリプトが含まれています。 このスクリプトは、この演習で必要なコンピューティング コアに対する十分なクォータが Azure サブスクリプションにあるリージョンに、Premium レベルの Azure Databricks ワークスペース リソースを作成しようとします。また、使用するユーザー アカウントのサブスクリプションに、Azure Databricks ワークスペース リソースを作成するための十分なアクセス許可があることを前提としています。 十分なクォータやアクセス許可がないためにスクリプトが失敗した場合は、Azure portal で、Azure Databricks ワークスペースを対話形式で作成してみてください。
-
Web ブラウザーで、
https://portal.azure.com
の Azure portal にサインインします。 -
ページ上部の検索バーの右側にある [>_] ボタンを使用して、Azure portal に新しい Cloud Shell を作成します。メッセージが表示されたら、PowerShell 環境を選んで、ストレージを作成します。 次に示すように、Azure portal の下部にあるペインに、Cloud Shell のコマンド ライン インターフェイスが表示されます。
注: 前に Bash 環境を使ってクラウド シェルを作成している場合は、そのクラウド シェル ペインの左上にあるドロップダウン メニューを使って、PowerShell に変更します。
-
ペインの上部にある区分線をドラッグして Cloud Shell のサイズを変更したり、ペインの右上にある — 、 ◻ 、X アイコンを使用して、ペインを最小化または最大化したり、閉じたりすることができます。 Azure Cloud Shell の使い方について詳しくは、Azure Cloud Shell のドキュメントをご覧ください。
-
PowerShell のペインで、次のコマンドを入力して、リポジトリを複製します。
rm -r mslearn-databricks -f git clone https://github.com/MicrosoftLearning/mslearn-databricks
-
リポジトリをクローンした後、次のコマンドを入力して setup.ps1 スクリプトを実行します。これにより、使用可能なリージョンに Azure Databricks ワークスペースがプロビジョニングされます。
./mslearn-databricks/setup.ps1
-
メッセージが表示された場合は、使用するサブスクリプションを選択します (これは、複数の Azure サブスクリプションへのアクセス権を持っている場合にのみ行います)。
-
スクリプトが完了するまで待ちます。通常、約 5 分かかりますが、さらに時間がかかる場合もあります。 待っている間に、Azure Databricks ドキュメントのDelta Lake の概要に関する記事をご確認ください。
クラスターの作成
Azure Databricks は、Apache Spark “クラスター” を使用して複数のノードでデータを並列に処理する分散処理プラットフォームです。** 各クラスターは、作業を調整するドライバー ノードと、処理タスクを実行するワーカー ノードで構成されています。 この演習では、ラボ環境で使用されるコンピューティング リソース (リソースが制約される場合がある) を最小限に抑えるために、単一ノード クラスターを作成します。 運用環境では、通常、複数のワーカー ノードを含むクラスターを作成します。
ヒント: Azure Databricks ワークスペースに 13.3 LTS 以降のランタイム バージョンを持つクラスターが既にある場合は、それを使ってこの演習を完了し、この手順をスキップできます。
-
Azure portal で、スクリプトによって作成された msl-xxxxxxx リソース グループ (または既存の Azure Databricks ワークスペースを含むリソース グループ) に移動します
-
Azure Databricks Service リソース (セットアップ スクリプトを使って作成した場合は、databricks-xxxxxxx という名前) を選択します。
-
Azure Databricks ワークスペースの [概要] ページで、[ワークスペースの起動] ボタンを使用して、新しいブラウザー タブで Azure Databricks ワークスペースを開きます。サインインを求められた場合はサインインします。
ヒント: Databricks ワークスペース ポータルを使用すると、さまざまなヒントと通知が表示される場合があります。 これらは無視し、指示に従ってこの演習のタスクを完了してください。
-
左側のサイドバーで、[(+) 新規] タスクを選択し、[クラスター] を選択します。
- [新しいクラスター] ページで、次の設定を使用して新しいクラスターを作成します。
- クラスター名: “ユーザー名の” クラスター (既定のクラスター名)**
- ポリシー:Unrestricted
- クラスター モード: 単一ノード
- アクセス モード: 単一ユーザー (自分のユーザー アカウントを選択)
- Databricks Runtime のバージョン: 13.3 LTS (Spark 3.4.1、Scala 2.12) 以降
- Photon Acceleration を使用する: 選択済み
- ノード タイプ: Standard_D4ds_v5
- 非アクティブ状態が ** 20 ** 分間続いた後終了する
-
クラスターが作成されるまで待ちます。 これには 1、2 分かかることがあります。
注: クラスターの起動に失敗した場合、Azure Databricks ワークスペースがプロビジョニングされているリージョンでサブスクリプションのクォータが不足していることがあります。 詳細については、「CPU コアの制限によってクラスターを作成できない」を参照してください。 その場合は、ワークスペースを削除し、別のリージョンに新しいワークスペースを作成してみてください。 次のように、セットアップ スクリプトのパラメーターとしてリージョンを指定できます:
./mslearn-databricks/setup.ps1 eastus
ノートブックを作成してデータを取り込む
-
サイド バーで [(+) 新規] タスクを使用して、Notebook を作成します。 [接続] ドロップダウン リストで、まだ選択されていない場合はクラスターを選択します。 クラスターが実行されていない場合は、起動に 1 分ほどかかる場合があります。
-
ノートブックの最初のセルに次のコードを入力します。このコードは、”シェル” コマンドを使用して、GitHub からクラスターで使用されるファイル システムにデータ ファイルをダウンロードします。**
%sh rm -r /dbfs/FileStore mkdir /dbfs/FileStore wget -O /dbfs/FileStore/sample_sales_data.csv https://github.com/MicrosoftLearning/mslearn-databricks/raw/main/data/sample_sales_data.csv
-
セルの左側にある [▸ セルの実行] メニュー オプションを使用して実行を行います。 そして、コードによって実行される Spark ジョブが完了するまで待ちます。
Azure Databricks ジョブによるデータ処理の自動化
-
新しいノートブックを作成し、後で簡単に識別できるようにデータ処理という名前を付けます。 これは、Databricks ジョブのデータ インジェストと処理ワークフローを自動化するタスクとして使用されます。
-
ノートブックの最初のセルで、次のコードを実行してデータセットをデータフレームに読み込みます。
# Load the sample dataset into a DataFrame df = spark.read.csv('/FileStore/*.csv', header=True, inferSchema=True) df.show()
-
新しいセルに、次のコードを入力して、製品カテゴリ別に売上データを集計します。
from pyspark.sql.functions import col, sum # Aggregate sales data by product category sales_by_category = df.groupBy('product_category').agg(sum('transaction_amount').alias('total_sales')) sales_by_category.show()
-
サイド バーで [(+) 新規] リンクを使用して、ジョブを作成します。
-
タスクの名前を指定し、作成したノートブックをタスクのソースとして Path フィールドに指定します。
-
[タスクの作成] を選択します。
-
右側のパネルのスケジュールの下で、[トリガーを追加] を選択し、ジョブの実行スケジュール(例:毎日、毎週)を設定できます。 ただし、この演習では、手動で実行します。
-
[今すぐ実行] を選択します。
-
[ジョブ] パネルで [実行] タブを選択し、ジョブの実行を監視します。
-
ジョブの実行が成功したら、[実行] リストでジョブを選択し、その出力を確認できます。
Azure Databricks ジョブを使用して、データインジェストと処理を正常に設定し、自動化しました。 このソリューションをスケーリングして、より複雑なデータ パイプラインを処理し、他の Azure サービスと統合して堅牢なデータ処理アーキテクチャを実現できるようになりました。
クリーンアップ
Azure Databricks ポータルの [コンピューティング] ページでクラスターを選択し、[■ 終了] を選択してクラスターをシャットダウンします。
Azure Databricks を調べ終わったら、作成したリソースを削除できます。これにより、不要な Azure コストが生じないようになり、サブスクリプションの容量も解放されます。