最近加入一个Spark项目,作为临时的开发人员协助进行开发工作。该项目中不存在测试的概念,开发人员按需求进行编码工作后,直接向生产系统部署,再由需求的提出者在生产系统检验程序运行结果的正确性。在这种原始的工作方式下,产品经理和开发人员总是在生产系统验证自己的需求、代码。可以想见,各种直接交给用户的错误导致了一系列的事故和不信任。为了处理各类线上问题,大家都疲于奔命。当工作进行到后期,每一个相关人都已经意气消沉,常常对工作避之不及。
为了改善局面,我尝试了重构部分代码,将连篇的SQL分散到不同的方法里,并对单个方法构建单元测试。目的是,在编码完成后,首先在本地执行单元测试,以实现:
- 部署到生产系统的代码中无SQL语法错误。
- 将已出现的bug写入测试用例,避免反复出现相同的bug。
- 提前发现一些错误,减少影响到后续环节的问题。
- 通过自动化减少开发和程序问题处理的总时间花费。
- 通过流程和结果的改善,减少开发人员的思维负担,增加与其他相关人的互信。
本文将介绍我的Spark单元测试实践,供大家参考、批评。
本文中的Spark API是PySpark,测试框架为pytest。
对于希望将本文当作单元测试教程使用的读者,本文会假定读者已经准备好了开发和测试所需要的环境。如果没有也没有关系,文末的参考部分会包含一些配置环境相关的链接。
本文链接:https://www.cnblogs.com/hhelibeb/p/10534862.html
原创内容,转载请注明
概念
定义
单元测试是一种测试方法,它的对象是单个程序单元/组件,目的是验证软件的每个组件都符合设计要求。
单元是软件中最小的可测试部分。它通常包含一些输入和单一的输出。
本文中的单元就是python函数(function)。
单元测试通常是程序开发人员的工作。
原则
为了实现单元测试,函数最好符合一个条件,
- 对于相同的输入,函数总有相同的输出。
这要求函数的输出结果不依赖内外部状态。
它的输出结果的确定不应该依赖输入参数外的任何内容,例如,不可以因为本地测试环境中没有相应的数据库就产生“连接数据库异常”导致无法返回结果。如果是类方法的话,也不可以依据一个可能被改变的类属性来决定输出。
同时,函数内部不能存在“副作用”。它不应该改变除了返回结果以外的任何内容,例如,不可以改变全局可变状态。
满足以上条件的函数,可以被称为“纯函数”。
代码实践
下面是数据和程序部分。
数据
假设我们的服务对象是一家水果运销公司,公司在不同城市设有仓库,现有三张表,其中inventory包含水果的总库存数量信息,inventory_ratio包含水果在不同城市的应有比例,
目标是根据总库存数量和比例算出水果在各地的库存,写入到第三张表inventory_city中。三张表的列如下,
1. inventory. Columns: “item”, “qty”.
2. inventory_ratio. Columns: “item”, “city”, “ratio”.
3. inventory_city. Columns: “item”, “city”, “qty”.
第一版代码
用最直接的方式实现这一功能,代码将是,
from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate() result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item ''') result.write.csv(path="somepath/inventory_city", mode="overwrite")
这段代码可以实现计算各城市库存的需求,但测试起来会不太容易。特别是如果未来我们还要在这个程序中增加其他逻辑的话,不同的逻辑混杂在一起后,测试和修改都会变得麻烦。
所以,在下一步,我们要将部分代码封装到一个函数中。
有副作用的函数
创建一个名为get_inventory_city的函数,将代码包含在内,
from pyspark.sql import SparkSession def get_inventory_city(): spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item ''')
result.write.csv(path="somepath/inventory_city", mode="overwrite") if __name__ == "__main__": get_inventory_city()
显然,这是一个不太易于测试的函数,因为它,
- 没有输入输出参数,不能直接根据给定数据检验运行结果。
- 包含对数据库的读/写,这意味着它要依赖外部数据库。
- 包含对spark session的获取/创建,这和计算库存的逻辑也毫无关系。
我们把这些函数中的多余的东西称为副作用。副作用和函数的核心逻辑纠缠在一起,使单元测试变得困难,也不利于代码的模块化。
我们必须另外管理副作用,只在函数内部保留纯逻辑。
无副作用的函数
按照上文中提到的原则,重新设计函数,可以得到,
from pyspark.sql import SparkSession, DataFrame def get_inventory_city(spark: SparkSession, inventory: DataFrame, ratio: DataFrame): inventory.createOrReplaceTempView('v_inventory') ratio.createOrReplaceTempView('v_ratio') result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item