设为首页 加入收藏

TOP

我的Spark SQL单元测试实践(一)
2019-09-17 18:42:58 】 浏览:68
Tags:Spark SQL 单元 测试 实践

最近加入一个Spark项目,作为临时的开发人员协助进行开发工作。该项目中不存在测试的概念,开发人员按需求进行编码工作后,直接向生产系统部署,再由需求的提出者在生产系统检验程序运行结果的正确性。在这种原始的工作方式下,产品经理和开发人员总是在生产系统验证自己的需求、代码。可以想见,各种直接交给用户的错误导致了一系列的事故和不信任。为了处理各类线上问题,大家都疲于奔命。当工作进行到后期,每一个相关人都已经意气消沉,常常对工作避之不及。

为了改善局面,我尝试了重构部分代码,将连篇的SQL分散到不同的方法里,并对单个方法构建单元测试。目的是,在编码完成后,首先在本地执行单元测试,以实现:

  1. 部署到生产系统的代码中无SQL语法错误。
  2. 将已出现的bug写入测试用例,避免反复出现相同的bug。
  3. 提前发现一些错误,减少影响到后续环节的问题。
  4. 通过自动化减少开发和程序问题处理的总时间花费。
  5. 通过流程和结果的改善,减少开发人员的思维负担,增加与其他相关人的互信。

本文将介绍我的Spark单元测试实践,供大家参考、批评。

本文中的Spark API是PySpark,测试框架为pytest。

对于希望将本文当作单元测试教程使用的读者,本文会假定读者已经准备好了开发和测试所需要的环境。如果没有也没有关系,文末的参考部分会包含一些配置环境相关的链接。

 

本文链接:https://www.cnblogs.com/hhelibeb/p/10534862.html

原创内容,转载请注明

概念

定义

单元测试是一种测试方法,它的对象是单个程序单元/组件,目的是验证软件的每个组件都符合设计要求。

单元是软件中最小的可测试部分。它通常包含一些输入和单一的输出。

本文中的单元就是python函数(function)。

单元测试通常是程序开发人员的工作。

原则

为了实现单元测试,函数最好符合一个条件,

  • 对于相同的输入,函数总有相同的输出。

这要求函数的输出结果不依赖内外部状态。

它的输出结果的确定不应该依赖输入参数外的任何内容,例如,不可以因为本地测试环境中没有相应的数据库就产生“连接数据库异常”导致无法返回结果。如果是类方法的话,也不可以依据一个可能被改变的类属性来决定输出。

同时,函数内部不能存在“副作用”。它不应该改变除了返回结果以外的任何内容,例如,不可以改变全局可变状态。

满足以上条件的函数,可以被称为“纯函数”。

代码实践

下面是数据和程序部分。

数据

假设我们的服务对象是一家水果运销公司,公司在不同城市设有仓库,现有三张表,其中inventory包含水果的总库存数量信息,inventory_ratio包含水果在不同城市的应有比例,

目标是根据总库存数量和比例算出水果在各地的库存,写入到第三张表inventory_city中。三张表的列如下,

1. inventory. Columns: “item”, “qty”.
2. inventory_ratio. Columns: “item”, “city”, “ratio”.
3. inventory_city. Columns: “item”, “city”, “qty”.

第一版代码

用最直接的方式实现这一功能,代码将是,

from pyspark.sql import SparkSession

if __name__ == "__main__":

    spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()

    result = spark.sql('''select t1.item, t2.city,
                                 case when t2.ratio is not null then t1.qty * t2.ratio 
                                      else                           t1.qty
                                 end as qty     
                          from      v_inventory as t1
                          left join v_ratio     as t2 on t1.item = t2.item ''')

    result.write.csv(path="somepath/inventory_city", mode="overwrite")

 

这段代码可以实现计算各城市库存的需求,但测试起来会不太容易。特别是如果未来我们还要在这个程序中增加其他逻辑的话,不同的逻辑混杂在一起后,测试和修改都会变得麻烦。

所以,在下一步,我们要将部分代码封装到一个函数中。

有副作用的函数

创建一个名为get_inventory_city的函数,将代码包含在内,

from pyspark.sql import SparkSession

def get_inventory_city():
    
    spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
    
    result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item ''')
    result.write.csv(path="somepath/inventory_city", mode="overwrite") if __name__ == "__main__": get_inventory_city()

显然,这是一个不太易于测试的函数,因为它,

  • 没有输入输出参数,不能直接根据给定数据检验运行结果。
  • 包含对数据库的读/写,这意味着它要依赖外部数据库。
  • 包含对spark session的获取/创建,这和计算库存的逻辑也毫无关系。

我们把这些函数中的多余的东西称为副作用。副作用和函数的核心逻辑纠缠在一起,使单元测试变得困难,也不利于代码的模块化。

我们必须另外管理副作用,只在函数内部保留纯逻辑

无副作用的函数

按照上文中提到的原则,重新设计函数,可以得到,

from pyspark.sql import SparkSession, DataFrame

def get_inventory_city(spark: SparkSession, inventory: DataFrame, ratio: DataFrame):

    inventory.createOrReplaceTempView('v_inventory')
    ratio.createOrReplaceTempView('v_ratio')

    result = spark.sql('''select t1.item, t2.city,
                                 case when t2.ratio is not null then t1.qty * t2.ratio 
                                      else                           t1.qty
                                 end as qty     
                          from      v_inventory as t1
                          left join v_ratio     as t2 on t1.item = t2.item
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇DBlink的创建与删除 下一篇Mysql的命令总结

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目