''')
return result
if __name__ == "__main__":
spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
inventory = spark.sql('''select * from inventory''')
ratio = spark.sql('''select * from inventory_ratio''')
result = get_inventory_city(spark, inventory, ratio)
result.write.csv(path="somepath/inventory_city", mode="overwrite")
修改后的函数get_inventory_city有3个输入参数和1个返回参数,函数内部已经不再包含对spark session和数据库表的处理,这意味着对于确定的输入值,它总会输出不变的结果。
这比之前的设计更加理想,因为函数只包含纯逻辑,所以调用者使用它时不会再受到副作用的干扰,这使得函数的可测试性和可组合性得到了提高。
测试代码
创建一个test_data目录,将csv格式的测试数据保存到里面。测试数据的来源可以是手工模拟制作,也可以是生产环境导出。
然后创建测试文件,添加代码,
from inventory import get_inventory_city
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
def test_get_inventory_city():
#导入测试数据
inventory = spark.read.format("csv").option("header", "true").load("./test_data/inventory.csv")
ratio = spark.read.format("csv").option("header", "true").load("./test_data/inventory_ratio.csv")
#执行函数
result = get_inventory_city(spark, inventory, ratio)
#验证拆分后的总数量等于拆分前的总数量
result.createOrReplaceTempView('v_result')
inventory.createOrReplaceTempView('v_inventory')
qty_before_split = spark.sql('''select sum(qty) as qty from v_inventory''')
qty_after_split = spark.sql('''select sum(qty) as qty from v_result''')
assert qty_before_split.take(1)[0]['qty'] == qty_after_split.take(1)[0]['qty']
执行测试,可以看到以下输出内容
============================= test session starts ============================= platform win32 -- Python 3.6.8, pytest-4.3.1, py-1.8.0, pluggy-0.9.0 rootdir: C:\Users\zhaozhe42\PycharmProjects\spark_unit\unit, inifile:collected 1 item
test_get_inventory_city.py .2019-03-21 14:16:24 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException [100%] ========================= 1 passed in 18.06 seconds ==========================
这样一个单元测试例子就完成了。
相比把程序放到服务器测试,单元测试的运行速度更快,开发者不用再担心测试会对生产作业和用户造成影响,也可以更早发现在编码期间犯下的错误。它也可以成为自动化测试的基础。
待解决的问题
目前我已经可以在项目中构建初步的单元测试,但依然面临着一些问题。
运行时间
上面这个简单的测试示例在我的联想T470笔记本上需要花费18.06秒执行完成,而实际项目中的程序的复杂度要更高,执行时间也更长。执行时间过长一件糟糕的事情,因为单元测试的执行花费越大,就会越被开发者拒斥。面对显示器等待单元测试执行完成的时间是难捱的。虽然相比于把程序丢到生产系统中执行,这种单元测试模式已经可以节约不少时间,但还不够好。
接下来可能会尝试的解决办法:提升电脑配置/改变测试数据的导入方式。
有效范围
在生产实践中构建纯函数是一件不太容易的事情,它对开发者的设计和编码能力有相当的要求。
单元测试虽然能帮助发现一些问题和确定问题代码范围,但它似乎并不能揭示错误的原因。只靠单元测试,不能完全证明代码的正确性。
笔者水平有限,目前写出的代码中仍有很多单元测试力所不能及的地方。可能需要在实践中对它们进行改进,或者引入其它测试手段作为补充。
参考
一些参考内容。
配置
Getting Started with PySpark on Windows
win10下安装pyspark
PyCharm中的pytest
pycharm 配置spark 2.2.0
阅读
函数响应式领域建模
ABAP单元测试最佳实践
|