设为首页 加入收藏

TOP

Pyspark --- Spark2.x architecture
2019-03-15 01:05:46 】 浏览:24
Tags:Pyspark --- Spark2.x architecture
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/jiaoyangdetian/article/details/88102106

1 SparkSession

The SparkSession is now the entry point for reading data, working with metadata(元数据), configuring(配置)the session, and managing the cluster(集群)resources.

df = spark.read.format('json').load('py/test/sql/people.json')
df = spark.read.json('py/test/sql/people.json')

2 Tungsten

1> 第一个版本主要的功能实现

Memory Management and Binary Processing(内存管理与二进制的处理)

Cache-aware computation(支持缓存计算):Algorithms and data structures to exploit(利用) memory hierarchy(层次结构)

Code generation(代码生成)

2> 第二版本的改进如下:

No virtual(虚拟的)function dispatches(分派,调度):优化了cpu的调度。

Intermediate(中间的)data in memory vs CPU registers(寄存器):

Tungsten Phase 2 places intermediate data into CPU registers. This is an order of magnitude(级) reduction(减少) in the number of cycles to obtain(获取) data from the CPU registers instead of from memory。(把中间数据由环存在内存中改进为缓存在CPU的寄存器中)。

Loop(循环) unrolling(展开)and SIMD

3> 数据—Plan—Model—RDD的过程图


bmV0L2ppYW95YW5nZGV0aWFu,size_16,color_FFFFFF,t_70" width="700">

optimization(优化)、execution(执行)

4 Structured Streaming

This is the underlying foundation for building Structured Streaming. While streaming is powerful, one of the key issues is that streaming can be difficult to build and maintain.(强大的streaming存在难于建立与维护的问题)

目前的2.x版本中进行了优化,在Spark SQL中建立了单独的新的API,更适用于event time, windowing, sessions, sources, and sinks(汇)。

5 Continuous(连续) applications

Altogether, Apache Spark 2.xnot only unified(统一) DataFrames and Datasets but also unified streaming, interactive(互动), and batch queries(批量查询). This opens a whole new set of use cases including the ability to aggregate(总) data into a stream and then serving(服务) it using traditional JDBC/ODBC, to change queries at run time, and/or to build and apply ML models in for many scenario(场景) in a variety of latency(延迟) use cases:

代码:

import os
import re

os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3.6'

import pyspark

# list 生成RDD
def sep1():

    sc = pyspark.SparkContext()
    data_list = list([('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12),
     ('Amber', 9)])
    data = sc.parallelize(data_list)
    # data= ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

    return

# 文件路径 生成RDD
def sep2():

    file_path = '/Users/alisa/Desktop/StepRunner/pyspark/datas/VS14MORT.txt'
    sc = pyspark.SparkContext()
    data_from_file = sc.textFile(file_path, 4)
    # data_from_file= /Users/alisa/Desktop/StepRunner/pyspark/datas/VS14MORT.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0
    row_data = data_from_file.take(1)
    # row_data= ['1  2101  M1087 432311  4M4  2014U7CN  I64 238 070 24 0111I64 01 I64 01  11  100 601']
    print('row_data=', row_data)

    return

# 不同类型的数据集合,模式转换的方式生成RDD
def sep3():

    a = ('Ferrari', 'fast')    # type tuple
    b = {'Porsche': 100000}    # type dict
    c = ['Spain','visited', 4504]   # type list

    sc = pyspark.SparkContext()
    datas_rdd = sc.parallelize([a, b, c])       # 多种类型的数据集合生成rdd
    # datas_rdd ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
    data_heterogenous = datas_rdd.collect()     # 模式聚合转换为一个大集合,可以看到这里是个list大数据集合
    # data_heterogenous= [('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain', 'visited', 4504]]
    num = data_heterogenous[1]['Porsche']       # 数据的读取
    # num= 100000

    return

# list 生成RDD
#sep1()
# 文件路径 生成RDD,并读取数据
#sep2()
# 不同类型的数据集合,模式转换的方式生成RDD
#sep3()


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇sparkstreaming实时流处理项目(.. 下一篇[Spark版本更新]--2.3.0发行说明

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }