设为首页 加入收藏

TOP

Spark设置任务个数
2019-02-15 01:30:15 】 浏览:97
Tags:Spark 设置 任务 个数

今天使用spark读取hive的数据,然后保存到es,数据总共有,数据量1g左右,代码如下所示

package datasource
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql.EsSparkSQL
object Data2ES {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    val options=Map(
      ("es.nodes", "192.168.111.75"),
      ("es.port", "9200"),
      ("es.index.auto.create", "true"),
      ("es.write.operation", "index")
    )
    val order_index=sqlContext.sql("select * from order_index")
 
    EsSparkSQL.saveToEs(order_index,"order_index_3/order_3",options)
    sc.stop()
  }
}

通过spark-submit提交任务,设置4个executor,每个executor有4个核,每个executor的内存为4g,命令如下

spark-submit --master spark://server3:7077 --executor-memory 4g --executor-cores 4 --total-executor-cores 16 --name qwm --class datasource.Data2ES /opt/jar/zjsm.jar

保存在HDFS上的数据分成了8个块,因此作业分成了8个小作业,任务运行了两个多小时。为提高任务的运行效率,打算将任务分成80个小任务。通过查询知道可以通过设置spark.default.parallelism参数或者spark.sql.shuffle.partitions参数(这个主要是针对spark sql设置的),因此将提交作业的命令改成下面的命令,但是发现任务数还是没有改变,仍然是8个任务

spark-submit --master spark://server3:7077 --executor-memory 4g --executor-cores 4 --total-executor-cores 16 --conf spark.sql.shuffle.partitions=80 --name qwm --class datasource.Data2ES /opt/jar/zjsm.jar

后来在https://stackoverflow.com/questions/38249624/how-to-change-partition-size-in-spark-sql这里看到另外一种设置任务数的方法

试了这种方式,仍然没有见效。仔细想了一下,应该可以通过改变分区数来控制任务个数,因此修改了代码,改变了分区数,使用这种方式确实改变了任务的个数

package datasource
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql.EsSparkSQL
object Data2ES {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    val options=Map(
      ("es.nodes", "192.168.111.76"),
      ("es.port", "9200"),
      ("es.index.auto.create", "true"),
      ("es.write.operation", "index")
    )
    val order_index=sqlContext.sql("select * from order_index")
    val rdd=order_index.rdd.repartition(80)
    val schema=order_index.schema
    val df=sqlContext.createDataFrame(rdd,schema)
    EsSparkSQL.saveToEs(df,"order_index_3/order_3",options)
    sc.stop()
  }
}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇2018-11-17 Spark介绍系列文章 下一篇SPARK 重命名DataFrame的列名

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目