设为首页 加入收藏

TOP

spark的自定义partitioner
2019-01-19 13:26:16 】 浏览:64
Tags:spark 定义 partitioner
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_20641565/article/details/76130724

在hadoop的mapreduce中默认patitioner是HashPartitioner,我们可以自定义Partitioner可以有效防止数据倾斜, 在Spark里面也是一样,在Spark里也是默认的HashPartitioner, 如果自己想自己定义Partitioner继承org.apache.spark里面的Partitioner并且重写它里面的两个方法就行了.

  • 模板如下:
//只需要继承Partitioner,重写两个方法
class MyPartitioner(val num: Int) extends Partitioner {

  //这里定义partitioner个数
  override def numPartitions: Int = 

  //这里定义分区规则
  override def getPartition(key: Any): Int = 
}
  • 具体案例:

对List里面的单词进行wordcount,并且输出按照每个单词的长度分区输出到不同文件里面

自定义partitioner如下:

class MyPartitioner(val num: Int) extends Partitioner {
  override def numPartitions: Int = num

  override def getPartition(key: Any): Int = {
    val len = key.toString.length

    //根据单词长度对分区个数取模
    len % num
  }
}
  • main方法:
object xy {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("urlLocal").setMaster("local[2]")
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List("lijie hello lisi", "zhangsan wangwu mazi", "hehe haha nihaoa heihei lure hehe hello word"))

    val rdd2 = rdd1.flatMap(_.split(" ")).map(x => {
      (x, 1)
    }).reduceByKey(_ + _)

    //这里指定自定义分区,然后输出
    val rdd3 = rdd2.sortBy(_._2).partitionBy(new MyPartitioner(4)).mapPartitions(x => x).saveAsTextFile("C:\\Users\\Administrator\\Desktop\\out01")

    println(rdd2.collect().toBuffer)

    sc.stop()
  }
}
  • 结果:

因为这里定义的是4个partition 所以最后产生4个文件 如图:

这里写图片描述

其中part-00000 和 part-00001如下:

这里写图片描述

其中part-00002 和 part-00003如下:

这里写图片描述

其中part-00000中zhangsan的长度对4取模为0和这个文件中其他较短的单词一样,所以在一个分区, part-00003没有内容,说明上面的单词的长度对4取模结果没有为3的

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark踩坑系列3--Spark使用总结与.. 下一篇spark从入门到放弃四十三:Spark S..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目