测试是软件开发中的基础工作,它经常被数据开发者忽视,但是它很重要。在本文中会展示如何使用Python的uniittest.mock库对一段PySpark代码进行测试。笔者会从数据科学家的视角来进行描述,这意味着本文将不会深入某些软件开发的细节。
本文链接:https://www.cnblogs.com/hhelibeb/p/10508692.html
英文原文:Stop mocking me! Unit tests in PySpark using Python’s mock library
单元测试和mock是什么?
单元测试是一种测试代码片段的方式,确保代码片段按预期工作。Python中的uniittest.mock库,允许人们将部分代码替换为mock对象,并对人们使用这些mock对象的方式进行断言。“mock”的功能如名字所示——它模仿代码中的对象/变量的属性。
最终目标:测试spark.sql(query)
PySpark中最简单的创建dataframe的方式如下:
df = spark.sql("SELECT * FROM table")
虽然它很简单,但依然应该被测试。
准备代码和问题
假设我们为一家电子商务服装公司服务,我们的目标是创建产品相似度表,用某些条件过滤数据,把它们写入到HDFS中。
假设我们有如下的表:
1. Products. Columns: “item_id”, “category_id”.
2. Product_similarity (unfiltered). Columns: “item_id_1”, “item_id_2”, “similarity_score”.
(假设Product_similarity中的相似度分数在0~1之间,越接近1,就越相似。)
查看一对产品和它们的相似度分数是很简单的:
SELECT s.item_id_1, s.item_id_2, s.similarity_score FROM product_similarity s WHERE s.item_id_1 != s.item_id_2
where子句将和自身对比的项目移除。否则的话会得到分数为1的结果,没有意义!
要是我们想要创建一个展示相同目录下的产品的相似度的表呢?要是我们不关心鞋子和围巾的相似度,但是想要比较不同的鞋子与鞋子、围巾与围巾呢?这会有点复杂,需要我们连接“product”和“product_similarity”两个表。
查询语句变为:
SELECT s.item_id_1, s.item_id_2, s.similarity_score FROM product_similarity s INNER JOIN products p ON s.item_id_1 = p.item_id INNER JOIN products q ON s.item_id_2 = q.item_id WHERE s.item_id_1 != s.item_id_2 AND p.category_id = q.category_i
我们也可能想得知与每个产品最相似的N个其它项目,在该情况下,查询语句为:
SELECT s.item_id_1, s.item_id_2, s.similarity_score FROM ( SELECT s.item_id_1, s.item_id_2, s.similarity_score, ROW_NUMBER() OVER(PARTITION BY item_id_1 ORDER BY similarity_score DESC) as row_num FROM product_similarity s INNER JOIN products p ON s.item_id_1 = p.item_id INNER JOIN products q ON s.item_id_2 = q.item_id WHERE s.item_id_1 != s.item_id_2 AND p.category_id = q.category_id ) WHERE row_num <= 10
(假设N=10)
现在,要是我们希望跨产品目录比较和在产品目录内比较两种功能成为一个可选项呢?我们可以通过使用名为same_category的布尔变量,它会控制一个字符串变量same_category_q的值,并将其传入查询语句(通过.format())。如果same_category为True,则same_category_q中为inner join的内容,反之,则为空。查询语句如下:
''' SELECT s.item_id_1, s.item_id_2, s.similarity_score FROM product_similarity s {same_category_q} '''.format(same_category_q='') # Depends on value of same_category boolean
(译注:Python 3.6以上可以使用f-Strings代替format)
让我们把它写得更清楚点,用function包装一下,
def make_query(same_category, table_paths): if same_category is True: same_category_q = ''' INNER JOIN {product_table} p ON s.item_id_1 = p.item_id INNER JOIN {product_table} q ON s.item_id_2 = q.item_id WHERE item_id_1 != item_id_2 AND p.category_id = q.category_id '''.format(product_table=table_paths["products"]["table"]) else: same_category_q = '' return same_category_q
到目前为止,很不错。我们输出了same_category_q,因此可以通过测试来确保它确实返回了所需的值。
回忆我们的目标,我们需要将dataframe写入HDFS,我们可以通过如下方法来测试函数:
def create_new_table(spark, table_paths, params, same_category_q): similarity_table = table_paths["product_similarity"]["table"] created_table = spark.sql(create_table_query.format(similarity_table=similarity_table, same_category_q=same_category_q, num_items=params["num_items"])) # Write table to some path created_table.coale