Microsoft Fabric でパイプラインを使用してデータを取り込む
データ レイクハウスは、クラウド規模の分析ソリューション用の一般的な分析データ ストアです。 データ エンジニアの中心的な職務の 1 つは、オペレーショナル データの複数のソースからレイクハウスへのデータ インジェストを実装して管理することです。 Microsoft Fabric では、”パイプライン” を作成することによって、データ インジェストの “抽出、変換、読み込み” (ETL) または “抽出、読み込み、変換” (ELT) ソリューションを実装できます。** ** **
Fabric では、Apache Spark もサポートされるため、データを大規模に処理するコードを作成して実行できます。 Fabric でパイプラインと Spark 機能を組み合わせることで、データを外部ソースからレイクハウスの基になる OneLake ストレージにコピーし、分析のためにテーブルに読み込む前に、Spark コードを使用してカスタム データ変換を実行する複雑なデータ インジェスト ロジックを実装できます。
このラボは完了するまで、約 60 分かかります。
注:この演習を完了するには、 Microsoft Fabric 試用版が必要です。
ワークスペースの作成
Fabric でデータを操作する前に、Fabric 試用版を有効にしてワークスペースを作成してください。
- Microsoft Fabric ホーム ページ (
https://app.fabric.microsoft.com/home?experience=fabric
) で、[Synapse Data Engineering] を選択します。 - 左側のメニュー バーで、 [ワークスペース] を選択します (アイコンは 🗇 に似ています)。
- 任意の名前で新しいワークスペースを作成し、Fabric 容量を含むライセンス モード (“試用版”、Premium、または Fabric) を選択します。**
-
開いた新しいワークスペースは空のはずです。
レイクハウスを作成する
ワークスペースが作成されたので、次にデータを取り込むデータ レイクハウスを作成します。
-
Synapse Data Engineering ホーム ページで、任意の名前で新しい Lakehouse を作成します。
1 分ほどすると、Tables や Files のない新しいレイクハウスが作成されます。
-
左側のペインの [レイク ビュー] タブで、Files ノードの […] メニューにある [新しいサブフォルダー] を選択し、new_data という名前のサブフォルダーを作成します。
パイプラインを作成する
データを簡単に取り込むには、パイプラインでデータのコピー アクティビティを使用して、データをソースから抽出し、レイクハウス内のファイルにコピーします。
- ご自分のレイクハウスの [ホーム] ページで、[データの取得]、[新しいデータ パイプライン] の順に選択し、Ingest Sales Data という名前の新しいデータ パイプラインを作成します。
- “データのコピー” ウィザードが自動的に開かない場合は、パイプライン エディター ページで [データのコピー] - [コピー アシスタントの使用] を選択します。
-
データのコピー ウィザードの [データ ソースの選択] ページで、検索バーに「HTTP」と入力し、[新しいソース] セクションで [HTTP] を選択します。
- [データ ソースへの接続] ウィンドウで、データ ソースへの接続に関する次の設定を入力します。
- URL:
https://raw.githubusercontent.com/MicrosoftLearning/dp-data/main/sales.csv
- 接続: 新しい接続の作成
- 接続名: “一意の名前を指定します”**
- データ ゲートウェイ: (なし)
- 認証の種類: 匿名
- URL:
- [次へ] を選択します。 次に、次の設定が選択されていることを確認します。
- 相対 URL: 空白のまま
- 要求メソッド:GET
- 追加ヘッダー :空白のまま
- バイナリ コピー: 選択解除
- 要求タイムアウト: 空白のまま
- 最大同時接続数: 空白のまま
- [次へ] を選択し、データがサンプリングされるのを待ってから、次の設定が選択されていることを確認します。
- ファイル形式: DelimitedText
- 列区切り記号: コンマ (,)
- 行区切り記号: 改行 (\n)
- 最初の行をヘッダーとして使用: 選択
- 圧縮の種類: なし
- [データのプレビュー] を選択すると、取り込まれたデータのサンプルが表示されます。 次に、データ プレビューを閉じて、 [次へ] を選択します。
- [データのコピー先の選択] ページで、[OneLake データ ハブ] を選択し、既存のレイクハウスを選択します。
- データのコピー先オプションを次のように設定し、 [次へ] を選択します。
- ルート フォルダー: Files
- フォルダーのパス名: new_data
- ファイル名: sales.csv
- コピー動作: なし
- ファイル形式オプションを次のように設定して、 [次へ] を選択します。
- ファイル形式: DelimitedText
- 列区切り記号: コンマ (,)
- 行区切り記号: 改行 (\n)
- ヘッダーをファイルに追加: 選択
- 圧縮の種類: なし
-
[コピーの概要] ページで、コピー操作の詳細を確認し、 [保存と実行] を選択します。
次に示すように、データのコピー アクティビティを含む新しいパイプラインが作成されます。
-
パイプラインの実行が開始されたら、パイプライン デザイナーの [出力] ペインで状態を監視できます。 ↻ ([更新]) アイコンを使用して、状態を更新し、正常に終了するまで待ちます。**
- 左側のメニュー バーで、レイクハウスを選択します。
- [ホーム] ページのレイクハウス エクスプローラー ペインで、Files を展開し、new_data フォルダーを選択して、sales.csv ファイルがコピーされていることを確認します。
ノートブックを作成する
-
レイクハウスの [ホーム] ページの [ノートブックを開く] メニューで、 [新しいノートブック] を選択します。
数秒後に、1 つの ‘‘セル’’ を含む新しいノートブックが開きます。** ノートブックは、’‘コード’’ または ‘‘マークダウン’’ (書式設定されたテキスト) を含むことができる 1 つ以上のセルで構成されます。** **
-
ノートブック内の既存のセルを選択します。これには、単純なコードが含まれています。既定のコードを次の変数宣言に置き換えます。
table_name = "sales"
-
セルの […] メニュー (右上にあります) で、 [パラメーター セルの切り替え] を選択します。 これにより、セルは、パイプラインからノートブックを実行するときに、そのセル内で宣言された変数をパラメーターとして扱うように構成されます。
-
parameters セルの下にある [+ コード] ボタンを使用して新しいコード セルを追加します。 次に、そのセルに次のコードを追加します。
from pyspark.sql.functions import * # Read the new sales data df = spark.read.format("csv").option("header","true").load("Files/new_data/*.csv") ## Add month and year columns df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate"))) # Derive FirstName and LastName columns df = df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1)) # Filter and reorder columns df = df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "EmailAddress", "Item", "Quantity", "UnitPrice", "TaxAmount"] # Load the data into a table df.write.format("delta").mode("append").saveAsTable(table_name)
このコードは、データのコピー アクティビティによって取り込まれた sales.csv ファイルからデータを読み込み、いくつかの変換ロジックを適用し、変換されたデータをテーブルとして保存します。テーブルが既に存在する場合は、データを追加します。
-
ノートブックが次のようになっていることを確認し、ツールバーの ▷ ([すべて実行]) ボタンを使用して、含まれているすべてのセルを実行します。
注: このセッション内で Spark コードを実行したのはこれが最初であるため、Spark プールを起動する必要があります。 これは、最初のセルが完了するまで 1 分ほどかかる場合があることを意味します。
- ノートブックの実行が完了したら、左側の [レイクハウス エクスプローラー] ペインで、Tables の […] メニューにある [更新] を選択し、sales テーブルが作成されていることを確認します。
- ノートブックのメニュー バーで、[⚙️] (設定) アイコンを使用してノートブックの設定を表示します。 次に、ノートブックの [名前] を Load Sales に設定し、[設定] ペインを閉じます。
- 左側のハブ メニュー バーで、レイクハウスを選択します。
- エクスプローラー ペインで、ビューを更新します。 次に、Tables を展開し、sales テーブルを選択して、それに含まれるデータのプレビューを表示します。
パイプラインを変更する
データを変換してテーブルに読み込むためのノートブックを実装したので、そのノートブックをパイプラインに組み込んで、再利用可能な ETL プロセスを作成できます。
- 左側のハブ メニュー バーで、先ほど作成した Ingest Sales Data パイプラインを選択します。
-
[アクティビティ] タブの [その他のアクティビティ] の一覧で、 [データの削除] を選択します。 次に、次に示すように、新しいデータの削除アクティビティをデータのコピー アクティビティの左側に配置し、その [完了時] 出力をデータのコピー アクティビティに接続します。
- データの削除アクティビティを選択し、デザイン キャンバスの下のペインで、次のプロパティを設定します。
- 全般:
- 名前: Delete old files
- ソース
- [接続]:”自分のレイクハウス”**
- ファイル パスの種類: ワイルドカード ファイル パス
- フォルダー パス: Files / new_data
- ワイルドカード ファイル名: *.csv
- 再帰的: “オンにします”**
- ログの設定:
- ログの有効化: “選択解除します”**
これらの設定により、sales.csv ファイルをコピーする前に、既存の .csv ファイルが確実に削除されます。
- 全般:
- パイプライン デザイナーの [アクティビティ] タブで、 [ノートブック] を選択し、ノートブック アクティビティをパイプラインに追加します。
-
[データのコピー] アクティビティを選択し、次に示すように、その [完了時] 出力をノートブック アクティビティに接続します。
- ノートブック アクティビティを選択し、デザイン キャンバスの下のペインで、次のプロパティを設定します。
- 全般:
- 名前: Load Sales notebook
- 設定:
- ノートブック: Load Sales
-
基本パラメーター: “次のプロパティを指定した新しいパラメーターを追加します”**
名前 Type 値 table_name String new_sales
table_name パラメーターはノートブックに渡され、parameters セル内の table_name 変数に割り当てられている既定値がオーバーライドされます。
- 全般:
-
[ホーム] タブで 🖫 ([保存]) アイコンを使用してパイプラインを保存します。** 次に、 ▷ ([実行]) ボタンを使用してパイプラインを実行し、すべてのアクティビティが完了するのを待ちます。
注: エラー メッセージ “Spark SQL クエリはレイクハウスのコンテキストでのみ使用できます。** 次の手順に進むには、レイクハウスをアタッチしてください” を受け取る場合:ノートブックを開き、左ペインで作成したレイクハウスを選び、[すべてのレイクハウスの削除] を選び、再度追加します。 パイプライン デザイナーに戻り、[▷ 実行] を選びます。
- ポータルの左端にあるハブ メニュー バーで、お使いのレイクハウスを選択します。
- エクスプローラー ペインで、Tables を展開し、new_sales テーブルを選択して、それに含まれるデータのプレビューを表示します。 このテーブルは、パイプラインによって実行されたときにノートブックによって作成されました。
この演習では、パイプラインを使用してデータを外部ソースからレイクハウスにコピーした後、Spark ノートブックを使用してデータを変換し、テーブルに読み込むデータ インジェストを実装しました。
リソースをクリーンアップする
この演習では、Microsoft Fabric でパイプラインを実装する方法を学習しました。
レイクハウスの探索が完了したら、この演習用に作成したワークスペースを削除できます。
- 左側のバーで、ワークスペースのアイコンを選択して、それに含まれるすべての項目を表示します。
- ツール バーの […] メニューで、 [ワークスペースの設定] を選択してください。
- [全般] セクションで、[このワークスペースの削除] を選択します。