设为首页 加入收藏

TOP

我的Spark SQL单元测试实践(二)
2019-09-17 18:42:58 】 浏览:71
Tags:Spark SQL 单元 测试 实践
''') return result if __name__ == "__main__": spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate() inventory = spark.sql('''select * from inventory''') ratio = spark.sql('''select * from inventory_ratio''') result = get_inventory_city(spark, inventory, ratio) result.write.csv(path="somepath/inventory_city", mode="overwrite")

修改后的函数get_inventory_city有3个输入参数和1个返回参数,函数内部已经不再包含对spark session和数据库表的处理,这意味着对于确定的输入值,它总会输出不变的结果。

这比之前的设计更加理想,因为函数只包含纯逻辑,所以调用者使用它时不会再受到副作用的干扰,这使得函数的可测试性和可组合性得到了提高。

测试代码

创建一个test_data目录,将csv格式的测试数据保存到里面。测试数据的来源可以是手工模拟制作,也可以是生产环境导出。

然后创建测试文件,添加代码,

from inventory import get_inventory_city
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()

def test_get_inventory_city():

    #导入测试数据
    inventory = spark.read.format("csv").option("header", "true").load("./test_data/inventory.csv")
    ratio     = spark.read.format("csv").option("header", "true").load("./test_data/inventory_ratio.csv")

    #执行函数
    result = get_inventory_city(spark, inventory, ratio)

    #验证拆分后的总数量等于拆分前的总数量
    result.createOrReplaceTempView('v_result')
    inventory.createOrReplaceTempView('v_inventory')

    qty_before_split = spark.sql('''select sum(qty) as qty from v_inventory''')
    qty_after_split  = spark.sql('''select sum(qty) as qty from v_result''')

    assert qty_before_split.take(1)[0]['qty'] == qty_after_split.take(1)[0]['qty']

执行测试,可以看到以下输出内容

============================= test session starts =============================
platform win32 -- Python 3.6.8, pytest-4.3.1, py-1.8.0, pluggy-0.9.0
rootdir: C:\Users\zhaozhe42\PycharmProjects\spark_unit\unit, inifile:collected 1 item

test_get_inventory_city.py .2019-03-21 14:16:24 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
                                             [100%]
========================= 1 passed in 18.06 seconds ==========================

这样一个单元测试例子就完成了。

相比把程序放到服务器测试,单元测试的运行速度更快,开发者不用再担心测试会对生产作业和用户造成影响,也可以更早发现在编码期间犯下的错误。它也可以成为自动化测试的基础。

待解决的问题

目前我已经可以在项目中构建初步的单元测试,但依然面临着一些问题。

运行时间

上面这个简单的测试示例在我的联想T470笔记本上需要花费18.06秒执行完成,而实际项目中的程序的复杂度要更高,执行时间也更长。执行时间过长一件糟糕的事情,因为单元测试的执行花费越大,就会越被开发者拒斥。面对显示器等待单元测试执行完成的时间是难捱的。虽然相比于把程序丢到生产系统中执行,这种单元测试模式已经可以节约不少时间,但还不够好。

接下来可能会尝试的解决办法:提升电脑配置/改变测试数据的导入方式。

有效范围

在生产实践中构建纯函数是一件不太容易的事情,它对开发者的设计和编码能力有相当的要求。

单元测试虽然能帮助发现一些问题和确定问题代码范围,但它似乎并不能揭示错误的原因。只靠单元测试,不能完全证明代码的正确性。

笔者水平有限,目前写出的代码中仍有很多单元测试力所不能及的地方。可能需要在实践中对它们进行改进,或者引入其它测试手段作为补充。

参考

一些参考内容。

配置

Getting Started with PySpark on Windows

win10下安装pyspark

PyCharm中的pytest

pycharm 配置spark 2.2.0

阅读

函数响应式领域建模

ABAP单元测试最佳实践

 

 
首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇DBlink的创建与删除 下一篇Mysql的命令总结

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目