设为首页 加入收藏

TOP

spark到hbase
2019-02-15 01:47:38 】 浏览:78
Tags:spark hbase

1)spark把数据写入到hbase需要用到:PairRddFunctions的saveAsHadoopDataset方法,这里用到了implicit conversion,需要我们引入

import org.apache.spark.SparkContext._

2)spark写入hbase,实质是借用了org.apache.hadoop.hbase.mapreduce.TableInputFormat这个对象,用其内部的recorderWriter将数据写入hbase

同时,也借用了hadoop的JobConf,配置和写MR的配置方式一样

3)请看下面代码,这里使用sparksql从hive里面读出数据,经过处理,写入到hbase

package savehbase

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object SaveHbase3 {
 def main(args: Array[String]): Unit = {
  val config = HBaseConfiguration.create()
  val jobConf = new JobConf(config)
  jobConf.setOutputFormat(classOf[TableOutputFormat])
  jobConf.set(TableOutputFormat.OUTPUT_TABLE,"table2")


  val sparkconf = new SparkConf().setAppName("ss").setMaster("local")
  val sc = new SparkContext(sparkconf)
  val hiveContext = new HiveContext(sc)
  hiveContext.setConf("spark.sql.shuffle.partitions","3")

  hiveContext.sql("select id,name from hiv3.tablea")
   .rdd.map(row => {
   val id = row(0).asInstanceOf[Int].toString
   val name = row(1).asInstanceOf[String]

   val put = new Put(Bytes.toBytes(id))

   put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes(name))

   (new ImmutableBytesWritable, put)


  }).saveAsHadoopDataset(jobConf)


 }
}

遇到的问题:


  1. 错误:(26, 11) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

  2. .map(row => {

经过查看spark官方文档,对spark有了一条这样的描述。

Datasetis Spark SQL’s strongly-typed API for working with structured data, i.e. records with a knownschema.

Datasets arelazyand structured query expressions are only triggered when an action is invoked. Internally, aDatasetrepresents alogical planthat describes the computation query required to produce the data (for a givenSpark SQL session).

A Dataset is a result of executing a query expression against data storage like files, Hive tables or JDBC databases. The structured query expression can be described by a SQL query, a Column-based SQL expression or a Scala/Java lambda function. And that is why Dataset operations are available in three variants.

从这可以看出,要想对dataset进行操作,需要进行相应的encode操作。特别是官网给的例子

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

从这看出,要进行map操作,要先定义一个Encoder。。

这就增加了系统升级繁重的工作量了。为了更简单一些,幸运的dataset也提供了转化RDD的操作。因此只需要将之前dataframe.map

在中间修改为:dataframe.rdd.map即可。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase Shell 练习 下一篇HBASE精炼笔记总结—[基础篇]

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目