//只需要继承Partitioner,重写两个方法classMyPartitioner(val num: Int)extendsPartitioner {//这里定义partitioner个数overridedef numPartitions: Int =
//这里定义分区规则overridedef getPartition(key: Any): Int =
}
具体案例:
对List里面的单词进行wordcount,并且输出按照每个单词的长度分区输出到不同文件里面
自定义partitioner如下:
classMyPartitioner(val num: Int)extendsPartitioner {overridedef numPartitions: Int = num
overridedef getPartition(key: Any): Int = {
val len = key.toString.length
//根据单词长度对分区个数取模
len % num
}
}
main方法:
object xy {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("urlLocal").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List("lijie hello lisi", "zhangsan wangwu mazi", "hehe haha nihaoa heihei lure hehe hello word"))
val rdd2 = rdd1.flatMap(_.split(" ")).map(x => {
(x, 1)
}).reduceByKey(_ + _)
//这里指定自定义分区,然后输出
val rdd3 = rdd2.sortBy(_._2).partitionBy(new MyPartitioner(4)).mapPartitions(x => x).saveAsTextFile("C:\\Users\\Administrator\\Desktop\\out01")
println(rdd2.collect().toBuffer)
sc.stop()
}
}