设为首页 加入收藏

TOP

大数据之   spark介绍和基本操作
2019-03-26 13:25:11 】 浏览:52
Tags:数据   spark 介绍 基本操作

spark是先进的大数据分布式编程和计算框架。试图替代hadoop,它是内存分布式计算,所以运行速度比磁盘读取式io流hadoop快100倍;spark的运行模式有批处理,流方式和交互方式

hadoop是离线式计算,spark可以实时计算

spark主要基本功能在SPARK CORE里,它是spark的调度中心,其中包括任务调动,

内存管理,容错管理及存储管理。同时也是一些列应用程序的集中地。包括两个重要部件,有向无环图DAG的分布式并行计算框架和容错分布式数据RDD。

pyspark是python语言编程的spark,可以在jupyter上运行,可以分为四个步骤:导包、配置conf、会话、sc.stop

本次练习用的是spark-2.3.0-bin-hadoop2.7,可以在apache官网下载,也可以在linux虚拟机pip install spark下载,下载完配置后刷新运行即可

#导入模块
import pyspark
#导入类
from pyspark import SparkContext,SparkConf
#创建配置,指定AppName,指定Master(主机)
conf = SparkConf(
        ).setAppName('demoRDD'
        ).setMaster('local[*]')

#创建会话
sc = SparkContext.getOrCreate(conf)
#通过会话实现对SPARK的操作
#关闭会话
sc.stop()


#map操作
#filter操作
#创建会话
sc = SparkContext.getOrCreate(conf)
#通过会话实现对SPARK的操作
#以python list提供一个测试用的数据
data = [x for x in range(11)]
rdd = sc.parallelize(data)
print('RDD对象记录数:',rdd.count())
print('RDD Collect结果:\n',rdd.collect())
'''
def noname(x):
    return x**3
'''
#map操作,映射
mappedRdd = rdd.map(lambda x:x**3)
print(mappedRdd.collect())

#判断一个数是否是偶数
def filterOdd(x):
    return x%2 == 0
#过滤操作
filtedRdd = mappedRdd.filter(filterOdd)
print(filtedRdd.collect())
print(rdd.collect())
#关闭会话
sc.stop()

#换一个写法
#map操作
#filter操作
#创建会话
sc = SparkContext.getOrCreate(conf)
#通过会话实现对SPARK的操作
#以python list提供一个测试用的数据
data = [x for x in range(11)]
rdd = sc.parallelize(data)
print('RDD对象记录数:',rdd.count())
print('RDD Collect结果:\n',rdd.collect())
#判断一个数是否是偶数
def filterOdd(x):
    return x%2 == 0
#map操作,映射
#过滤操作
listA = rdd.map(lambda x:x**3
            ).filter(filterOdd
            ).collect()
print(listA)
print(rdd.collect())
#关闭会话
sc.stop()

#使用文本文件做数据源
sc = SparkContext.getOrCreate(conf)
rows = sc.textFile("file:///Users/chuzhengkai/Desktop/test.txt")
print(rows.first())
print(rows.take(2))
print(rows.count())
print(rows.top(2))
sc.stop()

#使用多个文本文件
#进行词频统计
sc = SparkContext.getOrCreate(conf)
#多个文本文件获取到一个RDD里面
filesRDD = sc.wholeTextFiles('file:///Users/chuzhengkai/Desktop/*.txt')
#文件内容RDD
fileConRDD = filesRDD.map(lambda x:x[1])
#用回车符分隔字符串,形成列表
def sp(x):
    return x.split('\n')
#对每个文件内容做映射,结果是多个文件内容列表
#存在二维结构
strRDD = fileConRDD.map(sp)
#同样是映射,结果展平成一维结构
wordRDD = fileConRDD.flatMap(sp)
#结果,形成类一个元组表达一个文件,多个元组的列表
#词频统计map
wordDictRDD = wordRDD.map(lambda x:(x,1))

#Reduce
r = wordDictRDD.reduceByKey(lambda x,y:x+y)

#print(strRDD.collect())
#print(wordRDD.collect())
#print(wordDictRDD.collect())
print(r.collect())
sc.stop()

SparkContext建立spark与python对话。引入配置SparkConf
from pyspark import SparkContext,SparkConf
#配置    其中setmaster是链接到主机
conf=SparkConf().setAppName('demoPrj').setMaster('local[*]')
#创建会话
sc = SparkContext.getOrCreate(conf)
#用 python list 初始化
data =[1,2,3,4,5,6,7,8,9]
#并行化数据,创建出一个可以被并行操作的分布式数据集
rdd = sc.parallelize(data)
#collect 转成列表
print(rdd.collect())
#分割
print(rdd.getNumPartitions())
#glom返回一个rdd,一个数据集
print(rdd.glom().collect())
print(rdd.first())
sc.stop()

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala   WordCount 下一篇spark IDE:   System memory..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目