在 Azure Databricks 中使用 Apache Spark
Azure Databricks 是基于 Microsoft Azure 的常用开源 Databricks 平台的一个版本。 Azure Databricks 基于 Apache Spark 构建,为涉及处理文件中数据的数据工程和分析任务提供了高度可缩放的解决方案。 Spark 的优点之一是支持各种编程语言,包括 Java、Scala、Python 和 SQL;这让 Spark 一种非常灵活的数据处理工作负载(包括数据清理和操作、统计分析和机器学习以及数据分析和可视化)解决方案。
完成此练习大约需要 45 分钟。
预配 Azure Databricks 工作区
提示:如果你已有 Azure Databricks 工作区,则可以跳过此过程并使用现有工作区。
本练习包括一个用于预配新 Azure Databricks 工作区的脚本。 该脚本会尝试在一个区域中创建高级层 Azure Databricks 工作区资源,在该区域中,Azure 订阅具有本练习所需计算核心的充足配额;该脚本假设你的用户帐户在订阅中具有足够的权限来创建 Azure Databricks 工作区资源。 如果脚本由于配额或权限不足失败,可以尝试 在 Azure 门户中以交互方式创建 Azure Databricks 工作区。
- 在 Web 浏览器中,登录到 Azure 门户,网址为
https://portal.azure.com
。 -
使用页面顶部搜索栏右侧的 [>_] 按钮在 Azure 门户中创建新的 Cloud Shell,在出现提示时选择“PowerShell”环境并创建存储。 Cloud Shell 在 Azure 门户底部的窗格中提供命令行界面,如下所示:
注意:如果以前创建了使用 Bash 环境的 Cloud shell,请使用 Cloud Shell 窗格左上角的下拉菜单将其更改为“PowerShell”。
-
请注意,可以通过拖动窗格顶部的分隔条或使用窗格右上角的 —、◻ 或 X 图标来调整 Cloud Shell 的大小,以最小化、最大化和关闭窗格 。 有关如何使用 Azure Cloud Shell 的详细信息,请参阅 Azure Cloud Shell 文档。
-
在 PowerShell 窗格中,输入以下命令以克隆此存储库:
rm -r mslearn-databricks -f git clone https://github.com/MicrosoftLearning/mslearn-databricks
-
克隆存储库后,请输入以下命令以运行 setup.ps1 脚本,以在可用区域中预配 Azure Databricks 工作区:
./mslearn-databricks/setup.ps1
- 如果出现提示,请选择要使用的订阅(仅当有权访问多个 Azure 订阅时才会发生这种情况)。
- 等待脚本完成 - 这通常需要大约 5 分钟,但在某些情况下可能需要更长的时间。 在等待时,请查看 Azure Databricks 文档中的 Azure Databricks 上的探索式数据分析一文。
创建群集
Azure Databricks 是一个分布式处理平台,可使用 Apache Spark 群集在多个节点上并行处理数据。 每个群集由一个用于协调工作的驱动程序节点和多个用于执行处理任务的工作器节点组成。 在本练习中,将创建一个单节点群集,以最大程度地减少实验室环境中使用的计算资源(在实验室环境中,资源可能会受到限制)。 在生产环境中,通常会创建具有多个工作器节点的群集。
提示:如果 Azure Databricks 工作区中已有一个具有 13.3 LTS ML 或更高运行时版本的群集,则可以使用它来完成此练习并跳过此过程。
- 在 Azure 门户中,浏览到已由脚本创建的 msl-xxxxxxx* 资源组(或包含现有 Azure Databricks 工作区的资源组)
- 选择 Azure Databricks 服务资源(如果已使用安装脚本创建,则名为 databricks-xxxxxxx*)。
-
在工作区的“概述”** 页中,使用“启动工作区”** 按钮在新的浏览器标签页中打开 Azure Databricks 工作区;请在出现提示时登录。
提示:使用 Databricks 工作区门户时,可能会显示各种提示和通知。 消除这些内容,并按照提供的说明完成本练习中的任务。
- 在左侧边栏中,选择“(+) 新建”任务,然后选择“群集”。
- 在“新建群集”页中,使用以下设置创建新群集:
- 群集名称:用户名的群集(默认群集名称)
- 策略:非受限
- 群集模式:单节点
- 访问模式:单用户(选择你的用户帐户)
- Databricks 运行时版本:13.3 LTS(Spark 3.4.1、Scala 2.12)或更高版本
- 使用 Photon 加速:已选择
- 节点类型:Standard_DS3_v2
- 在处于不活动状态 20 分钟后终止****
- 等待群集创建完成。 这可能需要一到两分钟时间。
注意:如果群集无法启动,则订阅在预配 Azure Databricks 工作区的区域中的配额可能不足。 请参阅 CPU 内核限制阻止创建群集,了解详细信息。 如果发生这种情况,可以尝试删除工作区,并在其他区域创建新工作区。 可以将区域指定为设置脚本的参数,如下所示:
./mslearn-databricks/setup.ps1 eastus
使用 Spark 浏览数据
与许多 Spark 环境一样,Databricks 支持使用笔记本来合并笔记和交互式代码单元格,可用于探索数据。
创建笔记本
- 在边栏中,使用“(+) 新建”** 链接创建笔记本**。
- 将默认笔记本名称(无标题笔记本 [日期])更改为“使用 Spark 浏览数据”,然后在“连接”下拉列表中选择群集(如果尚未选择)。 如果群集未运行,可能需要一分钟左右才能启动。
引入数据
-
在笔记本的第一个单元格中输入以下代码,该代码使用 shell 命令将数据文件从 GitHub 下载到群集使用的文件系统中。**
%sh rm -r /dbfs/spark_lab mkdir /dbfs/spark_lab wget -O /dbfs/spark_lab/2019.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/2019.csv wget -O /dbfs/spark_lab/2020.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/2020.csv wget -O /dbfs/spark_lab/2021.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/2021.csv
-
使用单元格左侧的“▸ 运行单元格”菜单选项来运行该代码**。 然后等待代码运行的 Spark 作业完成。
查询文件中的数据
-
在现有代码单元格下,使用 + 图标添加新的代码单元格**。 然后在新单元格中输入并运行以下代码,以从文件加载数据并查看前 100 行。
df = spark.read.load('spark_lab/*.csv', format='csv') display(df.limit(100))
-
查看输出并注意文件中的数据与销售订单相关,但不包括列标题或有关数据类型的信息。 要让数据更有意义,可以定义数据帧的架构。
-
添加新的代码单元,并使用它来运行以下代码,这会定义数据的架构:
from pyspark.sql.types import * from pyspark.sql.functions import * orderSchema = StructType([ StructField("SalesOrderNumber", StringType()), StructField("SalesOrderLineNumber", IntegerType()), StructField("OrderDate", DateType()), StructField("CustomerName", StringType()), StructField("Email", StringType()), StructField("Item", StringType()), StructField("Quantity", IntegerType()), StructField("UnitPrice", FloatType()), StructField("Tax", FloatType()) ]) df = spark.read.load('/spark_lab/*.csv', format='csv', schema=orderSchema) display(df.limit(100))
-
注意,这一次数据帧将包括列标题。 然后添加新的代码单元,并使用它运行以下代码以显示数据帧架构的详细信息,然后验证是否已应用正确的数据类型:
df.printSchema()
筛选数据帧
- 添加新的代码单元,并使用它运行以下代码,这将:
- 筛选销售订单数据帧的列,以仅包含客户名称和电子邮件地址。
- 统计订单记录总数
- 计算不同客户的数量
- 显示不同的客户
customers = df['CustomerName', 'Email'] print(customers.count()) print(customers.distinct().count()) display(customers.distinct())
观察以下详细信息:
- 对数据帧执行操作时,结果是一个新数据帧(在本例中,是一个新客户数据帧,它是通过从 df 数据帧中选择特定的列子集来创建的)
- 数据帧提供 count 和 distinct 等函数,可用于汇总和筛选它们包含的数据 。
dataframe['Field1', 'Field2', ...]
语法是用于定义列子集的快速方法。 还可以使用 select 方法,所以上述代码的第一行可以编写为customers = df.select("CustomerName", "Email")
-
现在,我们来通过在新的代码单元格中运行以下代码,以应用一个筛选器来仅包括已订购特定产品的客户:
customers = df.select("CustomerName", "Email").where(df['Item']=='Road-250 Red, 52') print(customers.count()) print(customers.distinct().count()) display(customers.distinct())
请注意,你可以将多个函数“链接”到一起,以便一个函数的输出可以成为下一个函数的输入;在这种情况下,由所选方法创建的数据帧是用于应用筛选条件的 where 方法的源数据帧。
在数据帧中对数据进行聚合和分组
-
在新代码单元格中运行以下代码以聚合和分组订单数据:
productSales = df.select("Item", "Quantity").groupBy("Item").sum() display(productSales)
请注意,结果会显示按产品分组的订单数量之和。 groupBy 方法按“项”对行进行分组,随后将 sum 聚合函数应用于所有剩余的数值列(在本例中为“数量”)
-
在新代码单元格中,我们来尝试另一个聚合:
yearlySales = df.select(year("OrderDate").alias("Year")).groupBy("Year").count().orderBy("Year") display(yearlySales)
这一次,结果将显示每年的销售订单数。 请注意,所选方法包括一个 SQL year 函数,可用于提取 OrderDate 字段的年份组件,然后使用 alias 方法为提取的年份值分配一个列名称。 然后,按派生的“年份”列对数据进行分组,并计算每个组中的行计数,最后使用 orderBy 方法对生成的数据帧进行排序。
注意:要详细了解如何在 Azure Databricks 中使用数据帧,请参阅 Azure Databricks 文档中的数据帧简介 - Python。
使用 Spark SQL 查询数据
-
添加一个新代码单元格并使用它运行以下代码:
df.createOrReplaceTempView("salesorders") spark_df = spark.sql("SELECT * FROM salesorders") display(spark_df)
利用之前使用的数据帧对象的本机方法,能够非常有效地查询和分析数据。 但是,许多数据分析师更习惯使用 SQL 语法。 Spark SQL 是 Spark 中的 SQL 语言 API,可用于运行 SQL 语句,甚至可将数据保存在关系表中。 刚刚运行的代码可在数据帧中创建数据的关系视图,然后将使用 spark.sql 库在 Python 代码中嵌入 Spark SQL 语法,查询视图并将结果作为数据帧返回。
在单元格中运行 SQL
-
虽然能够将 SQL 语句嵌入到包含 PySpark 代码的单元格中非常有用,但数据分析师通常只想要直接在 SQL 中工作。 添加一个新代码单元格并使用它运行以下代码。
%sql SELECT YEAR(OrderDate) AS OrderYear, SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue FROM salesorders GROUP BY YEAR(OrderDate) ORDER BY OrderYear;
观察以下情况:
- 单元格开头的 ``%sql` 行(名为 magic)指示应使用 Spark SQL 语言运行时来运行此单元格中的代码,而不是 PySpark。
- SQL 代码会引用之前创建的 salesorder 视图。
- SQL 查询的输出将自动显示为单元格下的结果。
注意:有关 Spark SQL 和数据帧的详细信息,请参阅 Spark SQL 文档。
使用 Spark 直观呈现数据
众所周知,一张图片胜过千言万语,而图表通常胜过千行数据。 虽然 Azure Databricks 中的笔记本包含对可视化数据帧或 Spark SQL 查询中数据的支持,但它并非专为全面的图表设计。 但是,可以使用 Python 图形库(如 matplotlib 和 seaborn)根据数据帧中的数据创建图表 。
以可视化效果的形式查看结果
-
在新代码单元格中,运行以下代码以查询 saleorders 表:
%sql SELECT * FROM salesorders
- 在结果表上方,选择 +,然后选择“可视化效果”以查看可视化效果编辑器,然后应用以下选项****:
- 可视化效果类型:条形图
- X 列:项
- Y 列:添加新列并选择“数量”。 应用 Sum 聚合。
- 保存可视化效果,然后重新运行代码单元以查看笔记本中生成的图表。
matplotlib 入门
-
在新代码单元格中,运行以下代码,以将某些销售订单数据检索到数据帧中:
sqlQuery = "SELECT CAST(YEAR(OrderDate) AS CHAR(4)) AS OrderYear, \ SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue \ FROM salesorders \ GROUP BY CAST(YEAR(OrderDate) AS CHAR(4)) \ ORDER BY OrderYear" df_spark = spark.sql(sqlQuery) df_spark.show()
-
添加新的代码单元格并使用它来运行以下代码,这会导入 matplotlb 并使用它来创建图表:
from matplotlib import pyplot as plt # matplotlib requires a Pandas dataframe, not a Spark one df_sales = df_spark.toPandas() # Create a bar plot of revenue by year plt.bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue']) # Display the plot plt.show()
- 查看结果,其中包含每年总收入的柱形图。 请注意用于生成此图表的代码的以下功能:
- matplotlib 库需要 Pandas 数据帧,因此需要将 Spark SQL 查询返回的 Spark 数据帧转换为此格式。
- matplotlib 库的核心是 pyplot 对象 。 这是大多数绘图功能的基础。
-
默认设置会生成一个可用的图表,但它有很大的自定义空间。 添加包含以下代码的新代码单元格并运行它:
# Clear the plot area plt.clf() # Create a bar plot of revenue by year plt.bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange') # Customize the chart plt.title('Revenue by Year') plt.xlabel('Year') plt.ylabel('Revenue') plt.grid(color='#95a5a6', linestyle='--', linewidth=2, axis='y', alpha=0.7) plt.xticks(rotation=45) # Show the figure plt.show()
-
严格来说,绘图包含图。 在前面的示例中,图是隐式创建的;但也可以显式创建它。 尝试在新单元格中运行以下命令:
# Clear the plot area plt.clf() # Create a Figure fig = plt.figure(figsize=(8,3)) # Create a bar plot of revenue by year plt.bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange') # Customize the chart plt.title('Revenue by Year') plt.xlabel('Year') plt.ylabel('Revenue') plt.grid(color='#95a5a6', linestyle='--', linewidth=2, axis='y', alpha=0.7) plt.xticks(rotation=45) # Show the figure plt.show()
-
图可以包含多个子图,每个子图都其自己的轴上。 使用此代码创建多个图表:
# Clear the plot area plt.clf() # Create a figure for 2 subplots (1 row, 2 columns) fig, ax = plt.subplots(1, 2, figsize = (10,4)) # Create a bar plot of revenue by year on the first axis ax[0].bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange') ax[0].set_title('Revenue by Year') # Create a pie chart of yearly order counts on the second axis yearly_counts = df_sales['OrderYear'].value_counts() ax[1].pie(yearly_counts) ax[1].set_title('Orders per Year') ax[1].legend(yearly_counts.keys().tolist()) # Add a title to the Figure fig.suptitle('Sales Data') # Show the figure plt.show()
注意:若要详细了解如何使用 matplotlib 绘图,请参阅 matplotlib 文档。
使用 seaborn 库
-
添加新的代码单元格,并使用它来运行以下代码,这会使用 seaborn 库(该库基于 matplotlib 构建并抽象其一些复杂性)来创建图表:
import seaborn as sns # Clear the plot area plt.clf() # Create a bar chart ax = sns.barplot(x="OrderYear", y="GrossRevenue", data=df_sales) plt.show()
-
Seaborn 库可以简化复杂统计数据绘图的创建,并支持你你控制视觉主题,以实现一致的数据可视化效果。 将以下代码粘贴到新单元格中:
# Clear the plot area plt.clf() # Set the visual theme for seaborn sns.set_theme(style="whitegrid") # Create a bar chart ax = sns.barplot(x="OrderYear", y="GrossRevenue", data=df_sales) plt.show()
-
就像 matplotlib 一样。 seaborn 支持多种图表类型。 运行以下代码以创建折线图:
# Clear the plot area plt.clf() # Create a bar chart ax = sns.lineplot(x="OrderYear", y="GrossRevenue", data=df_sales) plt.show()
注意:若要详细了解如何使用 seaborn 绘图,请参阅 seaborn 文档。
清理
在 Azure Databricks 门户的“计算”页上,选择群集,然后选择“■ 终止”以将其关闭。
如果已完成对 Azure Databricks 的探索,则可以删除已创建的资源,以避免产生不必要的 Azure 成本并释放订阅中的容量。