设为首页 加入收藏

TOP

Spark 之 内置函数 JSON
2019-02-12 01:21:53 】 浏览:54
Tags:Spark 内置 函数 JSON

转自:http://blog.csdn.net/yizishou/article/details/52398665



需求

将DataFrame中的StructType类型字段下的所有内容转换为Json字符串。

Spark版本: 1.6.1

思路

  • DataFrame有toJSON方法,可将每

    个Row都转为一个Json字符串,并返回RDD[String]
  • DataFrame.write.json方法,可将数据写为Json格式文件

跟踪上述两处代码,发现最终都会调用Spark源码中的org.apache.spark.sql.execution.datasources.json.JacksonGenerator类,使用Jackson,根据传入的StructType、JsonGenerator和InternalRow,生成Json字符串。

开发

我们的函数只需传入一个参数,就是需要转换的列,因此需要实现org.apache.spark.sql.catalyst.expressions包下的UnaryExpression。

后续对功能进行了扩展,不是StructType类型的输入也可以转换。

  1. packageorg.apache.spark.sql.catalyst.expressions
  2. importjava.io.CharArrayWriter
  3. importorg.apache.spark.sql.catalyst.InternalRow
  4. importorg.apache.spark.sql.catalyst.analysis.TypeCheckResult
  5. importorg.apache.spark.sql.catalyst.expressions.codegen.CodeGenContext
  6. importorg.apache.spark.sql.catalyst.expressions.codegen.GeneratedExpressionCode
  7. importorg.apache.spark.sql.execution.datasources.json.JacksonGenerator
  8. importorg.apache.spark.sql.types.DataType
  9. importorg.apache.spark.sql.types.Metadata
  10. importorg.apache.spark.sql.types.StringType
  11. importorg.apache.spark.sql.types.StructField
  12. importorg.apache.spark.sql.types.StructType
  13. importcom.fasterxml.jackson.core.JsonFactory
  14. importorg.apache.spark.unsafe.types.UTF8String
  15. /**
  16. *将StructType类型的字段转换为JsonString
  17. *@authoryizhu.sun2016年8月30日
  18. */
  19. caseclassJson(child:Expression)extendsUnaryExpressionwithImplicitCastInputTypes{
  20. overridedefdataType:DataType=StringType
  21. overridedefinputTypes:Seq[DataType]=Seq(child.dataType)
  22. valinputStructType:StructType=child.dataTypematch{
  23. casest:StructType=>st
  24. case_=>StructType(Seq(StructField("col",child.dataType,child.nullable,Metadata.empty)))
  25. }
  26. overridedefcheckInputDataTypes():TypeCheckResult=TypeCheckResult.TypeCheckSuccess
  27. //参照org.apache.spark.sql.DataFrame.toJSON
  28. //参照org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.writeInternal
  29. protectedoverridedefnullSafeeva l(data:Any):UTF8String={
  30. valwriter=newCharArrayWriter
  31. valgen=newJsonFactory().createGenerator(writer).setRootValueSeparator(null)
  32. valinternalRow=child.dataTypematch{
  33. case_:StructType=>data.asInstanceOf[InternalRow]
  34. case_=>InternalRow(data)
  35. }
  36. JacksonGenerator(inputStructType,gen)(internalRow)
  37. gen.flush
  38. gen.close
  39. valjson=writer.toString
  40. UTF8String.fromString(
  41. child.dataTypematch{
  42. case_:StructType=>json
  43. case_=>json.substring(json.indexOf(":")+1,json.lastIndexOf("}"))
  44. })
  45. }
  46. overridedefgenCode(ctx:CodeGenContext,ev:GeneratedExpressionCode):String={
  47. valwriter=ctx.freshName("writer")
  48. valgen=ctx.freshName("gen")
  49. valst=ctx.freshName("st")
  50. valjson=ctx.freshName("json")
  51. valtypeJson=inputStructType.json
  52. defgetDataExp(data:Any)=
  53. child.dataTypematch{
  54. case_:StructType=>s"$data"
  55. case_=>s"neworg.apache.spark.sql.catalyst.expressions.GenericInternalRow(newObject[]{$data})"
  56. }
  57. defformatJson(json:String)=
  58. child.dataTypematch{
  59. case_:StructType=>s"$json"
  60. case_=>s"""$json.substring($json.indexOf(":")+1,$json.lastIndexOf("}"))"""
  61. }
  62. nullSafeCodeGen(ctx,ev,(row)=>{
  63. s"""
  64. |com.fasterxml.jackson.core.JsonGenerator$gen=null;
  65. |try{
  66. |org.apache.spark.sql.types.StructType$st=${classOf[Json].getName}.getStructType("${typeJson.replace("\"","\\\"")}");
  67. |java.io.CharArrayWriter$writer=newjava.io.CharArrayWriter();
  68. |$gen=newcom.fasterxml.jackson.core.JsonFactory().createGenerator($writer).setRootValueSeparator(null);
  69. |org.apache.spark.sql.execution.datasources.json.JacksonGenerator.apply($st,$gen,${getDataExp(row)});
  70. |$gen.flush();
  71. |String$json=$writer.toString();
  72. |${ev.value}=UTF8String.fromString(${formatJson(json)});
  73. |}catch(Exceptione){
  74. |${ev.isNull}=true;
  75. |}finally{
  76. |if($gen!=null)$gen.close();
  77. |}
  78. """.stripMargin
  79. })
  80. }
  81. }
  82. objectJson{
  83. valstructTypeCache=collection.mutable.Map[String,StructType]()//[json,type]
  84. defgetStructType(json:String):StructType={
  85. structTypeCache.getOrElseUpdate(json,{
  86. println(">>>>>getStructTypefromjson:")
  87. println(json)
  88. DataType.fromJson(json).asInstanceOf[StructType]
  89. })
  90. }
  91. }


注册

注意,SQLContext.functionRegistry的可见性为protected[sql]

  1. val(name,(info,builder))=FunctionRegistry.expression[Json]("json")
  2. sqlContext.functionRegistry.registerFunction(name,info,builder)

测试

  1. valsubSchema=StructType(Array(
  2. StructField("a",StringType,true),
  3. StructField("b",StringType,true),
  4. StructField("c",IntegerType,true)))
  5. valschema=StructType(Array(
  6. StructField("x",subSchema,true)))
  7. valrdd=sc.makeRDD(Seq(Row(Row("12",null,123)),Row(Row(null,"2222",null))))
  8. valdf=sqlContext.createDataFrame(rdd,schema)
  9. df.registerTempTable("df")
  10. importsqlContext.sql
  11. sql("selectx,x.afromdf").show
  12. sql("selectx,x.afromdf").printSchema
  13. sql("selectjson(x),json(x.a)fromdf").show
  14. sql("selectjson(x),json(x.a)fromdf").printSchema



结果



  1. +----------------+----+
  2. |x|a|
  3. +----------------+----+
  4. |[12,null,123]|12|
  5. |[null,2222,null]|null|
  6. +----------------+----+
  7. root
  8. |--x:struct(nullable=true)
  9. ||--a:string(nullable=true)
  10. ||--b:string(nullable=true)
  11. ||--c:integer(nullable=true)
  12. |--a:string(nullable=true)
  13. >>>>>getStructTypefromjson:
  14. {"type":"struct","fields":[{"name":"a","type":"string","nullable":true,"metadata":{}},{"name":"b","type":"string","nullable":true,"metadata":{}},{"name":"c","type":"integer","nullable":true,"metadata":{}}]}
  15. >>>>>getStructTypefromjson:
  16. {"type":"struct","fields":[{"name":"col","type":"string","nullable":true,"metadata":{}}]}
  17. +------------------+----+
  18. |_c0|_c1|
  19. +------------------+----+
  20. |{"a":"12","c":123}|"12"|
  21. |{"b":"2222"}|null|
  22. +------------------+----+
  23. root
  24. |--_c0:string(nullable=true)
  25. |--_c1:string(nullable=true)



需要注意的点

  1. 使用SparkSQL自定义函数一般有两种方法,一种是使用开放的api注册简单函数,即调用sqlContext.udf.register方法。另一种就是使用SparkSQL内置函数的注册方法(本例就是使用的这种方法)。前者优势是开发简单,但是实现不了较为复杂的功能,例如本例中需要获取传入的InternalRow的StructType,或者需要实现类似 def fun(arg: Seq[T]): T 这种泛型相关的功能(sqlContext.udf.register的注册方式无法注册返回值为Any的函数)。
  2. 本例中实现genCode函数时遇到了困难,即需要在生成的Java代码中构建StructType对象。这个最终通过序列化的思路解决,先使用StructType.json方法将StructType对象序列化为String,然后在Java代码中调用DataType.fromJson反序列化为StructType对象。


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark WordCount 两种运行方式 下一篇Spark的基本工作原理

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }