Azure 流分析入门
在本练习中,你将在 Azure 订阅中预配 Azure 流分析作业,并使用该作业来查询和汇总实时事件数据流,并将结果存储在 Azure 存储中。
完成此练习大约需要 15 分钟**。
开始之前
需要一个你在其中具有管理级权限的 Azure 订阅。
预配 Azure 资源
在本练习中,你将捕获模拟销售交易数据流,对其进行处理,并将结果存储在 Azure 存储中的 Blob 容器中。 你需要 Azure 事件中心命名空间和 Azure 存储帐户,可将流式处理数据发送到前者,后者用于存储流式处理的结果。
你将组合使用 PowerShell 脚本和 ARM 模板来预配这些资源。
- 登录到 Azure 门户,地址为 。
-
使用页面顶部搜索栏右侧的 [>_] 按钮在 Azure 门户中创建新的 Cloud Shell,在出现提示时选择“PowerShell”环境并创建存储。 Cloud Shell 在 Azure 门户底部的窗格中提供命令行界面,如下所示:
注意:如果以前创建了使用 Bash 环境的 Cloud shell,请使用 Cloud Shell 窗格左上角的下拉菜单将其更改为“PowerShell”。
-
请注意,可以通过拖动窗格顶部的分隔条或使用窗格右上角的 —、◻ 或 X 图标来调整 Cloud Shell 的大小,以最小化、最大化和关闭窗格 。 有关如何使用 Azure Cloud Shell 的详细信息,请参阅 Azure Cloud Shell 文档。
-
在 PowerShell 窗格中,输入以下命令以克隆包含此练习的存储库:
rm -r dp-203 -f git clone https://github.com/MicrosoftLearning/dp-203-azure-data-engineer dp-203
-
克隆存储库后,输入以下命令以更改为此练习的文件夹,然后运行其中包含的 setup.ps1 脚本:
cd dp-203/Allfiles/labs/17 ./setup.ps1
- 如果出现提示,请选择要使用的订阅(仅当有权访问多个 Azure 订阅时才会发生这种情况)。
- 等待脚本完成 - 这通常需要大约 5 分钟,但在某些情况下可能需要更长的时间。 在等待期间,请查看 Azure 流分析文档中的欢迎使用 Azure 流分析一文。
查看流式处理数据源
在创建用于处理实时数据的 Azure 流分析作业之前,让我们看一下它需要查询的数据流。
-
安装脚本运行完毕后,重设 Cloud Shell 窗格的大小或将其最小化,这样你就可以看到 Azure 门户(稍后将返回到 Cloud Shell)。 然后在 Azure 门户中,转到它创建的 dp203-xxxxxxx 资源组,注意该资源组包含 Azure 存储帐户和事件中心命名空间**。
请记下预配资源的“位置”- 稍后,你将在同一位置创建 Azure 流分析作业**。
-
重新打开 Cloud Shell 窗格,并输入以下命令以运行向 Azure 事件中心发送 100 个模拟订单的客户端应用:
node ~/dp-203/Allfiles/labs/17/orderclient
-
在发送时观察销售订单数据,每个订单都包含产品 ID 和数量。 应用将在发送 1000 个订单后结束,这需要一分钟左右的时间。
创建 Azure 流分析作业
现在你已准备好创建 Azure 流分析作业来处理到达事件中心的销售交易数据。
- 在 Azure 门户的“dp203-xxxxxxx” 页面上,选择“+ 创建”,然后搜索
Stream Analytics job
****。 然后,使用以下属性创建“流分析作业”:- 基本信息:
- 订阅:Azure 订阅
- 资源组:选择现有的 dp203-xxxxxxx 资源组**。
- 名称:
process-orders
- 区域:选择在其中预配其他 Azure 资源的区域。
- 托管环境:云
- 流单元:1
- 存储:
- 添加存储帐户:未选中
- 标记:
- 无
- 基本信息:
- 等待部署完成,然后转到部署的流分析作业资源。
为事件流创建输入
Azure 流分析作业必须从记录销售订单的事件中心获取输入数据。
- 在“process-orders”概述页上,选择“添加输入”****。 然后在“输入”页上,使用“添加流输入”菜单添加具有以下属性的“事件中心”输入 :
- 输入别名:
orders
- 从订阅选择事件中心:选中
- 订阅:Azure 订阅
- 事件中心命名空间:选择 eventsxxxxxxx 事件中心命名空间
- 事件中心名称:选择现有的 eventhubxxxxxxx 事件中心。
- 事件中心使用者组:选择现有 $Default 使用者组
- 身份验证模式:创建系统分配的托管标识
- 分区键:留空
- 事件序列化格式:JSON
- 编码:UTF-8
- 输入别名:
- 保存输入并在创建时等待。 你将看到多个通知。 等待“连接测试成功”通知。
为 Blob 存储创建输出
你将以 JSON 格式将聚合的销售订单数据存储在 Azure 存储 Blob 容器中。
- 查看“process-orders”流分析作业的“输出”页****。 然后使用“添加”菜单添加具有以下属性的“Blob 存储/ADLS Gen2”输出 :
- 输出别名:
blobstore
- 从订阅中选择“从订阅中选择 Blob 存储/ADLS Gen2”:选中
- 订阅:Azure 订阅
- 存储帐户:选择 storexxxxxxx 存储帐户**
- 容器:选择现有的数据容器**
- 身份验证模式:托管标识:系统分配
- 事件序列化格式:JSON
- 格式:行分隔
- 编码:UTF-8
- 写入模式:在结果到达时追加
- 路径模式:
{date}
- 日期格式:YYYY/MM/DD
- 时间格式:不适用
- 最小行数:20
- 最大时间:0 小时,1 分钟,0 秒
- 输出别名:
- 保存输出并等待创建。 你将看到多个通知。 等待“连接测试成功”通知。
创建查询
现在你已为 Azure 流分析作业定义了输入和输出,可以使用查询选择、筛选和聚合输入中的数据,并将结果发送到输出。
- 查看“process-orders”流分析作业的“查询”页****。 然后稍等片刻,直到(根据之前在事件中心捕获的销售订单事件)显示输入预览。
- 注意到输入数据包括客户端应用提交的消息中的 ProductID 和 Quantity 字段,还有一些其他事件中心字段,其中包括 EventProcessedUtcTime 字段,它指示事件何时添加到事件中心。
-
按照以下方式修改默认查询:
SELECT DateAdd(second,-10,System.TimeStamp) AS StartTime, System.TimeStamp AS EndTime, ProductID, SUM(Quantity) AS Orders INTO [blobstore] FROM [orders] TIMESTAMP BY EventProcessedUtcTime GROUP BY ProductID, TumblingWindow(second, 10) HAVING COUNT(*) > 1
注意到此查询使用 System.Timestamp(基于 EventProcessedUtcTime 字段)定义每 10 秒“滚动”(非重叠顺序)时段的开始和结束时间,计算该时间段内每个产品 ID 的总数量****。
- 使用“▷ 测试查询”按钮来验证查询,并确保“测试结果”状态指示“成功”(即使未返回任何行)****。
- 保存查询。
运行流式处理作业
现在你已准备好运行作业并处理一些实时销售订单数据。
- 查看“process-orders”流分析作业的“概述”页,并在“属性”选项卡上查看该作业的“输入”、“查询”、“输出”和“函数”**********。 如果“输入”和“输出”数为 0,请使用“概述”页上的“↻ 刷新”按钮显示“orders”输入和“blobstore”输出********。
- 选择“▷ 开始”按钮,然后立即启动流式处理作业。 等待收到流式处理作业已成功启动的通知。
-
重新打开 Cloud Shell 窗格并重新连接(如有必要),然后重新运行以下命令以提交另外 1000 个订单。
node ~/dp-203/Allfiles/labs/17/orderclient
- 运行应用时,在 Azure 门户中,返回到 dp203-xxxxxxx 资源组的页面,然后选择 storexxxxxxxxxxxx 存储帐户****。
- 在存储帐户边栏选项卡左侧的窗格中,选择“容器”选项卡。
- 打开“数据”容器,并使用“↻ 刷新”按钮刷新视图,直到看到名称为当前年份的文件夹****。
- 在“数据”容器中,在文件夹层次结构上导航,其中包括当前年份的文件夹,以及月份和日期的子文件夹**。
- 在小时文件夹中,请注意已创建的文件,该文件的名称应类似于 0_xxxxxxxxxxxxxxxx.json。
-
在文件的“…”菜单(位于文件详细信息右侧)上选择“查看/编辑”,然后查看文件的内容;其中应包含每 10 秒时间段的 JSON 记录,显示为每个产品 ID 处理的订单数,如下所示****:
{"StartTime":"2022-11-23T18:16:25.0000000Z","EndTime":"2022-11-23T18:16:35.0000000Z","ProductID":6,"Orders":13.0} {"StartTime":"2022-11-23T18:16:25.0000000Z","EndTime":"2022-11-23T18:16:35.0000000Z","ProductID":8,"Orders":15.0} {"StartTime":"2022-11-23T18:16:25.0000000Z","EndTime":"2022-11-23T18:16:35.0000000Z","ProductID":5,"Orders":15.0} {"StartTime":"2022-11-23T18:16:25.0000000Z","EndTime":"2022-11-23T18:16:35.0000000Z","ProductID":1,"Orders":16.0} {"StartTime":"2022-11-23T18:16:25.0000000Z","EndTime":"2022-11-23T18:16:35.0000000Z","ProductID":3,"Orders":10.0} {"StartTime":"2022-11-23T18:16:25.0000000Z","EndTime":"2022-11-23T18:16:35.0000000Z","ProductID":2,"Orders":25.0} {"StartTime":"2022-11-23T18:16:25.0000000Z","EndTime":"2022-11-23T18:16:35.0000000Z","ProductID":7,"Orders":13.0} {"StartTime":"2022-11-23T18:16:25.0000000Z","EndTime":"2022-11-23T18:16:35.0000000Z","ProductID":4,"Orders":12.0} {"StartTime":"2022-11-23T18:16:25.0000000Z","EndTime":"2022-11-23T18:16:35.0000000Z","ProductID":10,"Orders":19.0} {"StartTime":"2022-11-23T18:16:25.0000000Z","EndTime":"2022-11-23T18:16:35.0000000Z","ProductID":9,"Orders":8.0} {"StartTime":"2022-11-23T18:16:35.0000000Z","EndTime":"2022-11-23T18:16:45.0000000Z","ProductID":6,"Orders":41.0} {"StartTime":"2022-11-23T18:16:35.0000000Z","EndTime":"2022-11-23T18:16:45.0000000Z","ProductID":8,"Orders":29.0} ...
- 在 Azure Cloud Shell 窗格中,等待订单客户端应用完成。
- 在 Azure 门户中,刷新文件以查看生成的完整结果集。
- 返回到 dp203-xxxxxxx 资源组,并重新打开 process-orders 流分析作业****。
- 在流分析作业页的顶部,使用“⬜ 停止”按钮停止作业,并在出现提示时进行确认。
删除 Azure 资源
你已完成对 Azure 流分析的探索,现在应删除已创建的资源,以避免产生不必要的 Azure 成本。
- 在 Azure 门户的主页上,选择“资源组”。
- 选择包含 Azure 存储、事件中心和流分析资源的 dp203-xxxxxxx 资源组**。
- 在资源组的“概述”页的顶部,选择“删除资源组”。
-
输入 dp203-xxxxxxx 资源组名称以确认要删除该资源组,然后选择“删除” 。
几分钟后,将删除本练习中创建的资源。