设为首页 加入收藏

TOP

Spark随笔
2019-04-12 01:05:59 】 浏览:74
Tags:Spark 随笔

1.解决命令冲突

spark的启动与关闭命令与hadoop重复,直接调用可能调用的是hadoop,因此需要对spark的启动命令进行修改,将${spark_home}\sbin下的start-all.sh改为start-spark-start.sh。

2.spark web访问的页面

集群页面:http://${master_ip}:8080

单机页面:http://${slave_ip}:8080

3.提交spark任务

基本使用:spark-submit --master xx.class xx.jar input_path output_path

4.spark的运行模式

Local:①不指定master ②指定master:local[n] n为使用本地启动时使用n个cores,locan[*]使用本地所有的cores。

standalone:spark本身提供的集群模式:--master spark://host:port

yarn: 统一资源调度平台,--master yarn

mesos:类似yarn资源调度平台 --master mesos://host:port

5.使用spark-shell

先打开hadoop,后可直接在${spark_home}/sbin下启动spark-shell命令,可用--master指定启动模式。

6.spark版本的wordcount(scala)

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("wordcount")
    val sc = new SparkContext(conf)
    val words = sc.makeRDD(List("This is a good day","my name is pxd","good good study","day day up"))
    val rs: RDD[(String, Int)] = words.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
    rs.collect().map(t=>{println(t._1 ,t._2)})
  }
}

7.spark版本的wordcount(java

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;

public class WordCountJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("wordCountJava");
        conf.setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        ArrayList<String> word = new ArrayList<>();
        word.add("This is a good day");
        word.add("my name is pxd");
        word.add("good good study");
        word.add("day day up");
        JavaRDD<String> wordRdd = jsc.parallelize(word);
        //压扁
        JavaRDD<String> wordLines = wordRdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
        //转为(key,1)
        JavaPairRDD<String, Integer> pairRDD = wordLines.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {

                return new Tuple2<>(s, 1);
            }
        });
        //累加
        JavaPairRDD<String, Integer> result = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //排序
        JavaPairRDD<Integer, String> swap = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.swap();
            }
        });
        JavaPairRDD<Integer, String> resverfinal = swap.sortByKey(false);
        JavaPairRDD<String, Integer> resultLast = resverfinal.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                return integerStringTuple2.swap();
            }
        });
        resultLast.saveAsTextFile("");
        jsc.stop();
    }
}

8.RDD的创建

三种方式:

①集合并行化:sc.makeRDD(List[T])、sc.parallelize(List[t])

②读取外部文件系统:sc.textFile([File_Path/Hdfs_Path])

③调用转换类的算子,每次调用transformation算子都会重新生成一个RDD,RDD中的数据类型是由传入的函数决定的。

9.RDD分区

在spark-shell中查看分区使用rdd.partitions.size,分区的数量默认等于运行任务的cores数量,也可以自定指定分区数量。

10.读取外部RDD的分区

通过sc.textFile读取外部文件的时候,外部文件若在hdfs上,有几个block块就有几个分区(分区的数量至少有两个),也可以自己在通过sc.textFile("path",numPatitions)来指定分区数量。

11.通过转换类算子查看分区

转换类的算子一般清空下,分区不变

12.RDD的算子

RDD的算子可以分成两类:①transformation(转换类算子) laze类型 ②action,不会形成新的RDD,但是会开始执行前面RDD链条所有的算子。

13.部分Transformation算子

map

对某一个RDD指定,里面的每条数据都执行。

总结:

1.map是一一映射,RDD中有几条数据,就会被迭代几次。

2.Map的返回值类型,还是由输入函数的类型决定的。

3.RDD的分区是不变的。

mapValues

作用于RDD的v,RDD[key,value]中的key可以保持不变。

MapValue得到RDD之后,分区也是不变的。

flatMap

flatMap = map + flattern(得到新的RDD的分区不变)

mapPatitions

package Transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapPartitions {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("transformTest")
    conf.setMaster("local[3]")
    val sc = new SparkContext(conf)
    val maprdd: RDD[Int] = sc.makeRDD(List(1,3,2,5,7,8),3)
    val rdd2: RDD[Int] = maprdd.mapPartitions(t => {
      val threadName: String = Thread.currentThread().getName
      println("Thread:" + "====" + threadName)
      t.map(_ * 10)
    })
    rdd2.collect()

  }
}

zipWithIndex

package Transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapPartitions {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("transformTest")
    conf.setMaster("local[3]")
    val sc = new SparkContext(conf)
    val maprdd: RDD[Int] = sc.makeRDD(List(1,3,2,5,7,8),3)

    val rdd2: RDD[(Int, Long)] = maprdd.zipWithIndex()
    rdd2.collect().map(println)

  }
}

mapPartitionsWithIndex

定义:带索引的分区

package Transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapPartitionsWithIndex {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("transformTest")
    conf.setMaster("local[3]")
    val sc = new SparkContext(conf)
    val maprdd: RDD[Int] = sc.makeRDD(List(1,3,2,5,7,8),3)
    val rdd2: RDD[Unit] = maprdd.mapPartitionsWithIndex((i: Int, j: Iterator[Int]) => {
      j.map(t => {
        val threadName: String = Thread.currentThread().getName
        println("thread:" + threadName + ",partitions:" + i)
      })
    })
    rdd2.collect()
  }
}

groupBy,groupByKey,reduceByKey

package Transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object AllBy {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("allByTest")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val rdd1: RDD[Int] = sc.makeRDD(List(1,4,5,6))
    //groupBy
    val rddGroupBy: RDD[(Int, Iterable[Int])] = rdd1.groupBy(t=>t)
    val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("reba", 1000), ("yangmi", 3000), ("fengjie", 9000), ("baby", 6000), ("reba", 3000), ("yangmi", 8000)), 3)
    //按名字分类GroupBy
    val rdd3: RDD[(String, Iterable[(String, Int)])] = rdd2.groupBy(_._1)
    //计算每人的总片酬
    val rdd4: RDD[(String, Int)] = rdd3.mapValues(_.map(_._2).sum)
    //groupByKey
    val rdd5: RDD[(String, Iterable[Int])] = rdd2.groupByKey()
    val rdd6: RDD[(String, Int)] = rdd2.reduceByKey(_+_)
  }
}

groupByKey和reduceByKey的比较

相同点:

1.都会作用于RDD[k,v]

2.都是根据key来分组

3.二者的默认分区都不变

不同点:

①groupByKey默认没有聚合函数,返回值类型RDD[k,Iterator[v]]

②reduceByKey:必须传聚合函数,得到的返回值类型RDD[k,集合之后的v]

如果两个算子都能使用,优先使用reduceByKey

sortBy,sortByKey

都是排序,sortBy是根据自定义的条件进行排序,sortByKey是根据key来排序。

sortBy

sortByKey

Filter

过滤出所有满足的条件,即使分区没有数据了,最后的分区个数不变。

集合中的交集,并集,差集

并集:union:生成一个RDD,分区是两个集合的分区之和,分区中的元素不变。

交集:intersection:生成新的RDD分区以多的RDD为准。

差集:subtract:生成的分区,以集合在前的分区为准。

Distanct(去重):类型指定,顶层调用:

map(x,null).reduceByKey(x,y)=>x.numpartitions.map(_.1)

Reduce:归并的,聚合的,得到的顺寻是很不确定的用(_ ++ _)

top(n):取集合的前n个元素。

14.部分action算子

collect和collectAsMap,两者都是收集,不同的是collect返回类型是Array,collectAsMap返回类型是Map。

15.spark和mr的区别

spark基于内存进行计算,mr是基于磁盘的。DAG:有向无环图,中间过程不落地。

16.spark部分构件:

Application SparkContext Master Worker Executor TaskSchedular DagSchedular

ShuffleMapTask ResultTask

17.RDD简介

就是一个基本的抽象,操作一个RDD就当操作一个本地集合一样,降低了编程的复杂度,RDD中根本就不存在真正要处理的数据,而是记录RDD的转换关系。

RDD变换:返回指向新RDD的指针,在RDD之间创建依赖关系,每个RDD都有计算函数和指向父RDD的指针。

内部包含的5个主要属性:

①分区列表

②针对每个split的主要函数

③对其他rdd的依赖列表

④可选,如果是keyValueRDD,可以带分区类。

⑤可选,首选块位置列表(hdfs block location)

RDD Transformation
------------------
	返回指向新rdd的指针,在rdd之间创建依赖关系。每个rdd都有计算函数和指向父RDD的指针。

	map()		 //对每个元素进行变换,应用变换函数 (T)=>V
	filter()	  //过滤器,(T)=>Boolean
	flatMap()	   //压扁,T => TraversableOnce[U]
	mapPartitions()		//对每个分区进行应用变换,输入的Iterator,返回新的迭代器,可以对分区进行函数处理。Iterator<T> => Iterator<U>
	mapPartitionsWithIndex(func)	//同上,(Int, Iterator<T>) => Iterator<U>
	sample(withReplacement, fraction, seed)	    //采样返回采样的RDD子集。withReplacement 元素是否可以多次采样,fraction : 期望采样数量.[0,1]
	union()			//类似于mysql union操作。select * from persons where id < 10 union select * from id persons where id > 29 ;
	intersection		    //交集,提取两个rdd中都含有的元素。
	distinct([numTasks]))		//去重,去除重复的元素。
	groupByKey()			//(K,V) => (K,Iterable<V>)
	reduceByKey(*)			  //按key聚合。 
	aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])		//按照key进行聚合
	key:String U:Int = 0
	sortByKey			//排序
	join(otherDataset, [numTasks])		//连接,(K,V).join(K,W) =>(K,(V,W)) 
	cogroup					//协分组  (K,V).cogroup(K,W) =>(K,(Iterable<V>,Iterable<!-- <W> -->)) 
	cartesian(otherDataset)			//笛卡尔积,RR[T] RDD[U] => RDD[(T,U)]
	pipe				//将rdd的元素传递给脚本或者命令,执行结果返回形成新的RDD
	coalesce(numPartitions)			//减少分区
	repartition				//可增可减
	repartitionAndSortWithinPartitions(partitioner)    //再分区并在分区内进行排序
RDD Action
------------------
	collect()						//收集rdd元素形成数组,会将所有的executor拉取到driver端口,尽量不要使用。
	count()							//统计rdd元素的个数
	reduce()						//聚合,返回一个值。
	first							//取出第一个元素take(1)
	take							
	takeSample (withReplacement,num, [seed])
	takeOrdered(n, [ordering])
	saveAsTextFile(path)			//保存到文件
	saveAsSequenceFile(path)		//保存成序列文件
	saveAsObjectFile(path) (Java and Scala)
	countByKey()					//按照key,统计每个key下value的个数.

分区列表:RDD的数据集的基本组成单位,每个RDD都有一到多个分区,每个分区的数据,RDD都会记录存储在哪里。

getPartitions方法:
protected def getPartitions:Array[Partition]    //获取分区信息

computer方法:

@DeveloperApi
def compute(split:Partition,context:TaskContext):Iterator[t]

MapPatitionsRDD里面的compute方法:
override def compute(split:Partition,context:TaskContext):Iterator[U]=f(context,split.index,firstParent[T].iterator(split,context))
把父RDD中的数据,组成iterator,然后直接传入函数中,继续调用,父RDD中数据到子RDD中数据的逻辑转换,凭的就是compute方法。

RDD的依赖关系

RDD之间的依赖关系,分为两种:窄依赖(Narrow Dependency)和宽依赖(Shuffle Dependcy)。

窄依赖:一个RDD对他的父RDD只有简单的一对一的依赖关系,也就是说每个RDD的partition,仅仅是依赖于父RDD的一个partition。

宽依赖:本质上就是经历了shuffle的RDD,每个父RDD中的partition的数据都会传输到下一个RDD的partition中,此时就会出现子RDD和父RDD的错综复杂的关系,这就是宽依赖。

可选的分区器

在RDD<key,value>类型上才有分区器,RDD[key]的分区器为NONE。

两种分区器:第一种,HashPartitioner key的hashcode除分区数量取余。

第二种,RangePartitioner key的范围,分区的数量。

可选的最佳位置

理念:移动数据不如移动计算,优先在有数据的节点上启动计算。

RDD流向图:

18.逻辑上的任务调度流程:

Application --> job --> stage --> task

当我们在执行main方法的时候,并没有真正的执行程序,driver端记录了RDD之间的依赖关系,每个RDD传递了什么函数,记录RDD从何处读取数据。

当触发action算子的时候,才真正开始执行。

Dag --> 切分stage --> 组装task --> 把task交给executor去执行。

19.idea快捷键: ctrl + alt + B 看子类 ctrl + H 查看类的层级结构

20.依赖关系:RDD之间依赖关系,父子RDD之间的依赖,这种关系又叫做lineage(血缘),在源码中,Dependency父类有两个子类:NarrowDependency,ShuffleDependcy

依赖产生背景:

1.计算任务,宽依赖会进行shuffle,效率慢,而窄依赖则类似于流水线作业。

2.失败恢复:窄依赖更高效,宽依赖效率低。

3.spark的stage划分。

21.依赖和stage

宽依赖会切分stage,一些列的窄依赖都会在一个stage中,假设一个job中有2个shuffle类的算子,此时会有(2+1)个stage。

stage只有两类:shuffleMapStage和ResultStage,一个job中只有一个resultStage,其余都是shuffleMapStage。

宽/窄依赖的概念不止用在stage划分中,对容错也很有用,RDD的Lineage会记录RDD的元数据和转换行为,当该RDD的部分分区数据丢失时,他可以根据该这些信息来重新运算和恢复丢失的数据分区,Dependency代表了RDD之间的依赖关系,即血缘(Lineage)。

发生错误的时候,如果是窄依赖,只需要拿到父RDD中的分区数据和函数业务,重新执行,恢复速度快,如果是宽依赖,则必须拿到父RDD中的所有分区数据和函数业务,恢复起来因为走IO,修复速度慢。

22.如果executor挂掉了怎么办

Driver会通知master启动,重新执行,如果再次出错,diver会将任务分发给其他的executor中执行,容错机制,默认情况下会执行3次。

23.DAG

DAG(Directed Acyclic Graph)叫做有向无环图,指任意一条边有方向,且不存在环路的图。

在spark中,每一个操作生成一个RDD,RDD之间连一条线,最后这些RDD和他们之间的线组成一个有向无环图,这个就是spark的DAG,原始的RDD通过一些列的转换就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分为不同的stage,对于窄依赖,partition的转换处理在stage中完成计算,对于宽依赖,由于shuffle的存在,只能在partitionRDD处理完成后,才能开始接下来的计算,因此宽依赖是划分stage的依据。

宽依赖是划分stage的表示。

总结:

DAG有向无环图,代表RDD的转换过程,其实就是代表着数据的流向。

DAG是边界的,有开始有结束,通过SparkContext创建RDD开始的,触发action就会生成一个完整的DAG,DAG又会被切分为多个stage(阶段),切分的依据就是宽依赖(ShuffleDependency),先会提交前面的stage,然后提交后面的stage,一个stage中有多个Task,多个Task可以并行执行。

val wordAndOne = words.map((_,1))  //MapPartitionsRDD
val result = wordAndOne.reduceByKey(_+_) // ShuffledRDD
result.saveAsTextFile("hdfs_path") // MapPartitonsRDD
// println(result.toDebugString)
result.collect()//此DAG从sc.texFile开始的,计算RDD会从sc.textFile开始。

24.spark运行的任务机制

前三个阶段,构建DAG,切分stage,组装task在driver端进行,最后一个阶段,执行我们写的代码,在executor中执行的。

sparkContext中源码:
private var _schedulerBackend:SchedulerBackend = _
private var _taskScheduler:TaskScheduler = _
@volatile private var _dagScheduler:DAGScheduler = _

DagScheduler:负责切分stage,提交stage,把TaskSet提交给TaskScheduler去处理。
TaskScheduler:负责调度task,把task交给executor去执行。
ScheluerBackend:负责通信,在调用的时候真正执行的是StandloneSchedulerBackend,负责executor和master进行通讯。

Driver会对我们的业务逻辑程序进行解析,记录RDD之间的依赖关系,记录RDD的数据信息所在的位置,记录算子之间的逻辑,但是driver端只是调度和记录,不会执行我们的程序,当driver解析到我们的action的时候会生成job。

24.spark运行流程图

25.shuffle的流程

优化前:

优化后

26.sortBy

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala   WordCount 下一篇spark架构及原理

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目