设为首页 加入收藏

TOP

Spark中repartition和partitionBy的区别
2019-01-06 01:29:12 】 浏览:23
Tags:Spark repartition partitionBy 区别

repartition 和 partitionBy 都是对数据进行重新分区,默认都是使用 HashPartitioner,区别在于partitionBy 只能用于 PairRdd,但是当它们同时都用于 PairRdd时,结果却不一样:

不难发现,其实 partitionBy 的结果才是我们所预期的,Why抓狂我们打开 repartition 的源码进行查看:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
	coalesce(numPartitions, shuffle = true)
}

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions).values
    } else {
      new CoalescedRDD(this, numPartitions)
    }
}

注意到没有第15行没有,repartition 其实使用了一个随机生成的数来当做 Key,而不是使用原来的 Key!!

使用Spark的版本为1.6.1


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇查看Spark进程的JVM配置及内存使用 下一篇spark mllib 归一化

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }