设为首页 加入收藏

TOP

spark的介绍和pyspark的使用
2019-01-16 01:06:58 】 浏览:207
Tags:spark 介绍 pyspark 使用
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/dxyna/article/details/79772343

从这个名字pyspark就可以看出来,它是由python和spark组合使用的.

相信你此时已经电脑上已经装载了hadoop,spark,python3.

那么我们现在开始对pyspark进行了解一番(当然如果你不想了解直接往下翻找pyspark的使用):

1. 背景:

产生与加州大学伯克利分校AMP实验室,2013年6月称为Apache成为孵化项目,使用Scala语言进行实现的,而Scala建立在JAVA之上,

为什么要设计这么一个东西

为了改善Hadoop的MAP REDUCE的弱点:

1. 交互式和迭代式

2. 在集群多点内存中运行的分布式计算

3. 容错数据集合

为什么要用SPARK

1. 先进的大数据分布式编程和计算框架

2. 视图替代Hadoop(Spark可以独立与Hadoop,但是他不能替代Hadoop,因为Hadoop现在依然很重要)

3. 内存分布式计算:运行数度快

4. 可以使用不同的语言编程(java,scala,r 和python)

5. 可以从不同的数据源获取数据

可以从HDFS,Cassandea,HBase等等

同时可以支持很多的文件格式:text Seq AVRO Parquet

6. 实现不同的大数据功能:Spark Core,Sparc SQL等等

2. 主要部件

1.spark core :包含spark的主要基本功能,所有和Rdd有关的API都出自于spark core

2.spark sql :spark中用于结构话处理的软件包,用户可以在soark环境下使用sql语言处理数据

等等(其他先不介绍)

3. 介绍一下spark core

1.它是spark生态圈的核心:

负责读取数据

完成分布式计算

2.包含俩个重要部件

有向无环图(DAG)的分布式并行计算框架

容错分布式数据RDD(Resilient Distributed Dataset)

3.总体来说就是spark功能调度管理中心,用来定义和管理RDD,RDD代表了一系列数据集合分布在基质的内存中,spark core 的任务就是对这些数据进行分布式计算

4.RDD(重点):

弹性分布式数据集分布在不同的集群节点的内存中,可以理解为一大数组,数组的每一个元素就是RDD的一个分区,一个RDD可以分布并被运算在多态计算机节点的内存以及硬盘中,

RDD数据块可以放在磁盘上也可以放在内存中(取决于你的设置),如果出现缓冲失效或丢失,RDD分区可以重新计算刷新,RDD是不能被修改的但是可以通过API被变换生成新的RDD.

有俩类对RDD的操作(也成算子):

1.变换(懒执行): 有 map flatMap groupByKey reduceByKey 等

他们只是将一些指令集而不会马上执行,需要有操作的时候才会真正计算出结果

2.操作(立即执行): 有 count take collect 等

他们会返回结果,或者把RDD数据输出

这些操作实现了MapReduce的基本函数map,reduce及计算模型,还提供了filter,join,groupBYKey等,另外spark sql 可以用来操作有数据结构的RDD即SPARK DATA FRAME

它的运行原理和mapreduce是一样的,只是他们的运行方式不同,mr的运算是内存磁盘交互读写,不能在内存中共享数据,而RDD可以被共享和持久化.因为大数据运算经常是交互式和迭代式的,所以数据的重用性很重要,而mr的磁盘交互读写带来的I/O开销导致数度减慢



废话这么多了开始表演了!!


首先我们需要启动hadoop和spark

接下来在命令行输入:

jupyter-notebook --ip 192.168.50.129

--ip 后面跟的是你此时的ip,这样我们就会得到一个网址:

接下来我们复制它在浏览器上打开,就会进入jupyter的页面,我们通过点击new,python3来创建一个文件

首先我们需要导入py4j:

其实就是在python3里面导入了spark和sc模块


要注意了:下图红框里面的要对应你spark/python/lib里面的文件

import os
import sys
spark_name = os.environ.get('SPARK_HOME',None)
if not spark_name:
    raise ValueErrorError('spark环境没有配置好')
sys.path.insert(0,os.path.join(spark_name,'python'))
sys.path.insert(0,os.path.join(spark_name,'python/lib/py4j-0.10.4-src.zip'))
exec(open(os.path.join(spark_name,'python/pyspark/shell.py')).read())

现在我们就可以使用pyspark了:



请注意,spark在交互式shell下运行时候,这里的sc即SparkContext 的一个实例已经自动生成了,这是因为pyspark shell本身就是spark应用的driver程序,而driver程序包含应用的main函数定义RDD并在计算机集群上进行各种操作,所以一旦获得SparkContext object 即sc ,driver就可以访问spark了,因此sc可以看成是driver对计算机集群的连接.

spark里面的core里面的RDD有俩个组织,一个为driver另一个为worker,有点像hadoop里面的namenode和datanode,所以driver只能有一个而worker可以为多个.driver负责获取数据,管理worker,所以worker就负责工作.


有俩种类型的RDD:

1. 并行集合:来自与分布式化的数据对象,比如我们上面的代码,python里面的list对象,再比如用户自己键入的数据

并行化RDD就是通过调用sc的parallelize方法,在一个已经存在的数据集合上创建的(一个Seq对象),集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集,比如上面的代码,演示了如何python中的list创建一个并行集合,并进行分行

2. 文件系统数据集读取数据

spark可以将任何hadoop所支持的存储资源转换称RDD,如本地文件(语言网络文件系统),索引的节点都必须能访问到,HDFS,mongodb,HBase,等

下面我们开始使用文件系统集读取数据,首先在hello里面上传一个文件,比如:


开始写代码:

lines = sc.textFile("hdfs://python2:9000/hello/data.csv")

既然我们已经获取了数据那就开始操作:

1.map()

他的参数是一个函数(支持lambda函数),函数应用于RDD的每一个元素,函数的参数只能有一个,返回值是一个新的RDD

2.flatMap()

参数是一个函数,函数应用为RDD的每一个元素,参数只有1个,将数据进行拆分,变成迭代器,返回值是一个新的RDD


如上图,可以进一步的将数据拆分出来,也可以进行添加一些别的操作,比如:


3.filter()

参数是一个函数,与python里面的filter一样,函数会过滤掉不符号和条件的元素,返回值是一个新的RDD

4.reduce()

并行汇总所有RDD元素,参数是一个函数,函数的参数有2个




5.countByValue()

各RDD元素在RDD中出现的次数


也可以像reduceByKey()变换每一组内汇总


6.reduceByKey()

在每一键组内汇总变换,查看每一个数据在文件里面出现的次数


7.sortByKey()

未完待续...

附链接:

spark RDD的使用



】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark与深度学习框架——H2O、dee.. 下一篇Spark介绍(一)简介

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目