设为首页 加入收藏

TOP

Spark UDF实践之json解析
2019-05-11 02:01:04 】 浏览:48
Tags:Spark UDF 实践 json 解析
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_33689414/article/details/85125302

Spark UDF实践之json解析

我们一般使用spark处理json字段时,通常使用schema来约束json的字段,但是json数据中也会有一些需要特殊处理的字段需要获取,那么我们就需要通过UDF来进行处理了。

下面解析一个json的数据做一个示例:

json数据源:

{"final_score":16, "risk_items":[{"item_id":3403925, "item_name":"7天内申请人在多个平台申请借款", "risk_level":"high", "group":"多平台借贷申请检测", "item_detail":{"discredit_times":null, "overdue_details":null, "platform_count":2, "court_details":null, "fraud_type":null, "platform_detail":["一般消费分期平台:1", "P2P网贷:1"], "high_risk_areas":null, "hit_list_datas":null, "frequency_detail_list":null}},{"item_id":3403927,"item_name":"1个月内申请人在多个平台申请借款","risk_level":"medium","group":"多平台借贷申请检测","item_detail":{"discredit_times":null,"overdue_details":null,"platform_count":2,"court_details":null,"fraud_type":null,"platform_detail":["一般消费分期平台:1","P2P网贷:1"],"high_risk_areas":null,"hit_list_datas":null,"frequency_detail_list":null}},{"item_id":3403929,"item_name":"3个月内申请人在多个平台申请借款","risk_level":"medium","group":"多平台借贷申请检测","item_detail":{"discredit_times":null,"overdue_details":null,"platform_count":2,"court_details":null,"fraud_type":null,"platform_detail":["一般消费分期平台:1","P2P网贷:1"],"high_risk_areas":null,"hit_list_datas":null,"frequency_detail_list":null}},{"item_id":3403931,"item_name":"6个月内申请人在多个平台申请借款","risk_level":"medium","group":"多平台借贷申请检测","item_detail":{"discredit_times":null,"overdue_details":null,"platform_count":2,"court_details":null,"fraud_type":null,"platform_detail":["一般消费分期平台:1","P2P网贷:1"],"high_risk_areas":null,"hit_list_datas":null,"frequency_detail_list":null}},{"item_id":3403935,"item_name":"18个月内申请人在多个平台申请借款","risk_level":"low","group":"多平台借贷申请检测","item_detail":{"discredit_times":null,"overdue_details":null,"platform_count":2,"court_details":null,"fraud_type":null,"platform_detail":["一般消费分期平台:1","P2P网贷:1"],"high_risk_areas":null,"hit_list_datas":null,"frequency_detail_list":null}},{"item_id":3403937,"item_name":"24个月内申请人在多个平台申请借款","risk_level":"low","group":"多平台借贷申请检测","item_detail":{"discredit_times":null,"overdue_details":null,"platform_count":2,"court_details":null,"fraud_type":null,"platform_detail":["一般消费分期平台:1","P2P网贷:1"],"high_risk_areas":null,"hit_list_datas":null,"frequency_detail_list":null}},{"item_id":3403939,"item_name":"60个月以上申请人在多个平台申请借款","risk_level":"low","group":"多平台借贷申请检测","item_detail":{"discredit_times":null,"overdue_details":null,"platform_count":2,"court_details":null,"fraud_type":null,"platform_detail":["一般消费分期平台:1","P2P网贷:1"],"high_risk_areas":null,"hit_list_datas":null,"frequency_detail_list":null}}],"final_decision":"Accept","report_time":1495377281000,"success":true,"report_id":"ER2017052122344113605405","apply_time":1495377281000}

我们需要解析出,item_name分别为:7天内申请人在多个平台申请借款,1个月内申请人在多个平台申请借款,3个月内申请人在多个平台申请借款,6个月内申请人在多个平台申请借款,对应的platform_count的值。

下面就直接上代码了:

import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

if __name__ == '__main__':

    # 定义UDF函数,字段的获取规则
    def parse_1(risk_items, item_name):
        for i in risk_items:
            print(i)
            print(type(i))
            try:
                if i.item_name == item_name:
                    return i.item_detail.platform_count
            except:
                return ""


    # 解决一个python环境的bug,本地默认是python3,这里用的是python2.7版本
    os.environ['PYSPARK_PYTHON'] = '/usr/bin/python'

    spark = SparkSession \
        .builder \
        .appName("application") \
        .master("local") \
        .getOrCreate()

    # 注册UDF函数,sparksql函数名为td_parse1,定义的func名parse_1
    spark.udf.register("td_parse1", parse_1)
    
    # 读取json数据
    df = spark.read.json("1.json")
    
    # 创建临时表
    df.createOrReplaceTempView("tmp")

    # 定义sparksql
    resDf = spark.sql(
        """
        select
        final_score as td_final_score,
        td_parse1(risk_items,'7天内申请人在多个平台申请借款') as td_platform_count_7d,
        td_parse1(risk_items,'1个月内申请人在多个平台申请借款') as td_platform_count_1m,
        td_parse1(risk_items,'3个月内申请人在多个平台申请借款') as td_platform_count_3m,
        td_parse1(risk_items,'6个月内申请人在多个平台申请借款') as td_platform_count_6m
        from tmp""")

    # 展示数据
    resDf.show()
    spark.stop()

解析出来的结果如图:

image

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇jquery.masonry瀑布流插件的4个使.. 下一篇安装gem How To Install Ruby on..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目