版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/jiaoyangdetian/article/details/88102106
1 SparkSession
The SparkSession is now the entry point for reading data, working with metadata(元数据), configuring(配置)the session, and managing the cluster(集群)resources.
df = spark.read.format('json').load('py/test/sql/people.json')
df = spark.read.json('py/test/sql/people.json')
2 Tungsten
1> 第一个版本主要的功能实现
Memory Management and Binary Processing(内存管理与二进制的处理)
Cache-aware computation(支持缓存计算):Algorithms and data structures to exploit(利用) memory hierarchy(层次结构)
Code generation(代码生成)
2> 第二版本的改进如下:
No virtual(虚拟的)function dispatches(分派,调度):优化了cpu的调度。
Intermediate(中间的)data in memory vs CPU registers(寄存器):
Tungsten Phase 2 places intermediate data into CPU registers. This is an order of magnitude(级) reduction(减少) in the number of cycles to obtain(获取) data from the CPU registers instead of from memory。(把中间数据由环存在内存中改进为缓存在CPU的寄存器中)。
Loop(循环) unrolling(展开)and SIMD
3> 数据—Plan—Model—RDD的过程图
optimization(优化)、execution(执行)
4 Structured Streaming
This is the underlying foundation for building Structured Streaming. While streaming is powerful, one of the key issues is that streaming can be difficult to build and maintain.(强大的streaming存在难于建立与维护的问题)
目前的2.x版本中进行了优化,在Spark SQL中建立了单独的新的API,更适用于event time, windowing, sessions, sources, and sinks(汇)。
5 Continuous(连续) applications
Altogether, Apache Spark 2.xnot only unified(统一) DataFrames and Datasets but also unified streaming, interactive(互动), and batch queries(批量查询). This opens a whole new set of use cases including the ability to aggregate(总) data into a stream and then serving(服务) it using traditional JDBC/ODBC, to change queries at run time, and/or to build and apply ML models in for many scenario(场景) in a variety of latency(延迟) use cases:
代码:
import os
import re
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3.6'
import pyspark
# list 生成RDD
def sep1():
sc = pyspark.SparkContext()
data_list = list([('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12),
('Amber', 9)])
data = sc.parallelize(data_list)
# data= ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
return
# 文件路径 生成RDD
def sep2():
file_path = '/Users/alisa/Desktop/StepRunner/pyspark/datas/VS14MORT.txt'
sc = pyspark.SparkContext()
data_from_file = sc.textFile(file_path, 4)
# data_from_file= /Users/alisa/Desktop/StepRunner/pyspark/datas/VS14MORT.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0
row_data = data_from_file.take(1)
# row_data= ['1 2101 M1087 432311 4M4 2014U7CN I64 238 070 24 0111I64 01 I64 01 11 100 601']
print('row_data=', row_data)
return
# 不同类型的数据集合,模式转换的方式生成RDD
def sep3():
a = ('Ferrari', 'fast') # type tuple
b = {'Porsche': 100000} # type dict
c = ['Spain','visited', 4504] # type list
sc = pyspark.SparkContext()
datas_rdd = sc.parallelize([a, b, c]) # 多种类型的数据集合生成rdd
# datas_rdd ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
data_heterogenous = datas_rdd.collect() # 模式聚合转换为一个大集合,可以看到这里是个list大数据集合
# data_heterogenous= [('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain', 'visited', 4504]]
num = data_heterogenous[1]['Porsche'] # 数据的读取
# num= 100000
return
# list 生成RDD
#sep1()
# 文件路径 生成RDD,并读取数据
#sep2()
# 不同类型的数据集合,模式转换的方式生成RDD
#sep3()