使用 Azure Databricks 自动执行数据引入和处理
Databricks 作业是一项功能强大的服务,可实现数据引入和处理工作流的自动化。 它支持复杂数据管道的业务流程,其中包括从各种源引入原始数据、使用 Delta Live Tables 转换此数据以及将其保存到 Delta Lake 等任务,以便进一步分析。 借助 Azure Databricks,用户可以自动计划并运行其数据处理任务,确保数据始终是最新的,并可用于决策过程。
完成本实验室大约需要 20 分钟。
预配 Azure Databricks 工作区
提示:如果你已有 Azure Databricks 工作区,则可以跳过此过程并使用现有工作区。
本练习包括一个用于预配新 Azure Databricks 工作区的脚本。 该脚本会尝试在一个区域中创建高级层 Azure Databricks 工作区资源,在该区域中,Azure 订阅具有本练习所需计算核心的充足配额;该脚本假设你的用户帐户在订阅中具有足够的权限来创建 Azure Databricks 工作区资源。 如果脚本由于配额或权限不足失败,可以尝试 在 Azure 门户中以交互方式创建 Azure Databricks 工作区。
-
在 Web 浏览器中,登录到 Azure 门户,网址为
https://portal.azure.com
。 -
使用页面顶部搜索栏右侧的 [>_] 按钮在 Azure 门户中创建新的 Cloud Shell,在出现提示时选择“PowerShell”环境并创建存储。 Cloud Shell 在 Azure 门户底部的窗格中提供命令行界面,如下所示:
注意:如果以前创建了使用 Bash 环境的 Cloud shell,请使用 Cloud Shell 窗格左上角的下拉菜单将其更改为“PowerShell”。
-
请注意,可以通过拖动窗格顶部的分隔条或使用窗格右上角的 —、◻ 或 X 图标来调整 Cloud Shell 的大小,以最小化、最大化和关闭窗格 。 有关如何使用 Azure Cloud Shell 的详细信息,请参阅 Azure Cloud Shell 文档。
-
在 PowerShell 窗格中,输入以下命令以克隆此存储库:
rm -r mslearn-databricks -f git clone https://github.com/MicrosoftLearning/mslearn-databricks
-
克隆存储库后,请输入以下命令以运行 setup.ps1 脚本,以在可用区域中预配 Azure Databricks 工作区:
./mslearn-databricks/setup.ps1
-
如果出现提示,请选择要使用的订阅(仅当有权访问多个 Azure 订阅时才会发生这种情况)。
-
等待脚本完成 - 这通常需要大约 5 分钟,但在某些情况下可能需要更长的时间。 在等待时,请查看 Azure Databricks 文档中的 Delta Lake 简介一文。
创建群集
Azure Databricks 是一个分布式处理平台,可使用 Apache Spark 群集在多个节点上并行处理数据。 每个群集由一个用于协调工作的驱动程序节点和多个用于执行处理任务的工作器节点组成。 在本练习中,将创建一个单节点群集,以最大程度地减少实验室环境中使用的计算资源(在实验室环境中,资源可能会受到限制)。 在生产环境中,通常会创建具有多个工作器节点的群集。
提示:如果 Azure Databricks 工作区中已有一个具有 13.3 LTS ML 或更高运行时版本的群集,则可以使用它来完成此练习并跳过此过程。
-
在 Azure 门户中,浏览到已由脚本创建的 msl-xxxxxxx* 资源组(或包含现有 Azure Databricks 工作区的资源组)
-
选择 Azure Databricks 服务资源(如果已使用安装脚本创建,则名为 databricks-xxxxxxx*)。
-
在工作区的“概述”** 页中,使用“启动工作区”** 按钮在新的浏览器标签页中打开 Azure Databricks 工作区;请在出现提示时登录。
提示:使用 Databricks 工作区门户时,可能会显示各种提示和通知。 消除这些内容,并按照提供的说明完成本练习中的任务。
-
在左侧边栏中,选择“(+) 新建”任务,然后选择“群集”。
- 在“新建群集”页中,使用以下设置创建新群集:
- 群集名称:用户名的群集(默认群集名称)
- 策略:非受限
- 群集模式:单节点
- 访问模式:单用户(选择你的用户帐户)
- Databricks 运行时版本:13.3 LTS(Spark 3.4.1、Scala 2.12)或更高版本
- 使用 Photon 加速:已选择
- 节点类型:Standard_D4ds_v5
- 在处于不活动状态 20 分钟后终止****
-
等待群集创建完成。 这可能需要一到两分钟时间。
注意:如果群集无法启动,则订阅在预配 Azure Databricks 工作区的区域中的配额可能不足。 请参阅 CPU 内核限制阻止创建群集,了解详细信息。 如果发生这种情况,可以尝试删除工作区,并在其他区域创建新工作区。 可以将区域指定为设置脚本的参数,如下所示:
./mslearn-databricks/setup.ps1 eastus
创建笔记本并引入数据
-
在边栏中,使用“(+) 新建”** 链接创建笔记本。 在“连接”** 下拉列表中,选择群集(如果尚未选择)。 如果群集未运行,可能需要一分钟左右才能启动。
-
在笔记本的第一个单元格中输入以下代码,该代码使用 shell 命令将数据文件从 GitHub 下载到群集使用的文件系统中。
%sh rm -r /dbfs/FileStore mkdir /dbfs/FileStore wget -O /dbfs/FileStore/sample_sales_data.csv https://github.com/MicrosoftLearning/mslearn-databricks/raw/main/data/sample_sales_data.csv
-
使用单元格左侧的“▸ 运行单元格”菜单选项来运行该代码**。 然后等待代码运行的 Spark 作业完成。
使用 Azure Databricks 作业自动处理数据
-
新建一个笔记本并将其命名为“数据处理”**,以便日后更轻松地识别。 它将用作任务,以在 Databricks 作业中自动化数据引入和处理工作流。
-
在笔记本的第一个单元中,运行以下代码将数据集加载到数据帧中:
# Load the sample dataset into a DataFrame df = spark.read.csv('/FileStore/*.csv', header=True, inferSchema=True) df.show()
-
在新单元格中,输入以下代码,按产品类别汇总销售数据:
from pyspark.sql.functions import col, sum # Aggregate sales data by product category sales_by_category = df.groupBy('product_category').agg(sum('transaction_amount').alias('total_sales')) sales_by_category.show()
-
在边栏中,使用“(+) 新建”** 链接创建作业**。
-
提供任务的名称,并在“路径”** 字段中指定创建为任务源的笔记本。
-
选择“创建任务”**。
-
在右侧面板中的“计划”中**,可以选择“添加触发器”** 并设置运行作业的计划(例如每日、每周)。 但是,对于本练习,我们将手动执行它。
-
选择“立即运行”。
-
在“作业”面板中选择“运行”** 选项卡并监控作业运行。
-
作业运行成功后,可以在“运行”列表中选择它并验证其输出。
已使用 Azure Databricks 作业成功设置和自动化数据引入和处理。 现在可以扩展此解决方案来处理更复杂的数据管道,并与其他 Azure 服务集成,实现可靠的数据处理体系结构。
清理
在 Azure Databricks 门户的“计算”页上,选择群集,然后选择“■ 终止”以将其关闭。
如果已完成对 Azure Databricks 的探索,则可以删除已创建的资源,以避免产生不必要的 Azure 成本并释放订阅中的容量。