设为首页 加入收藏

TOP

Livy : A REST Interface for Apache Spark
2018-11-21 16:24:37 】 浏览:43
Tags:Livy REST Interface for Apache Spark

官网:http://livy.incubator.apache.org/

Github:https://github.com/apache/incubator-livy

概述:

当前spark上的管控平台有spark job server,zeppelin,由于spark job server和zeppelin都存在一些缺陷,比如spark job server不支持提交sql,zeppelin不支持jar包方式提交,并且它们都不支持yarn cluster模式,只能用client的模式运行,这会严重影响扩展性。针对这些问题,cloudera研发了Livy,Livy结合了spark job server和Zeppelin的优点,并解决了spark job server和Zeppelin的缺点,下面是他们的对比。

Spark job server

Zeppelin

Livy

是否支持jar包提交

支持

不支持

支持

是否支持代码段提交

不支持

支持

支持

是否支持SparkContext重用和管理

支持

支持重用,不能stop

支持

运行模式

Client

Client

Client/Cluster

多个SparkContext

运行

在同一个JVM,可能存在问题

运行在不同JVM

运行在不同JVM

接口

RESTFul

RESTFul/WebSocket

RESTFul

SQL是否支持

不支持

支持

支持

下面是Livy架构图

client将在初始化时创建一个远程Spark集群,并通过REST api提交作业。Livy服务器将打开、包装这个job,并通过RPC发送到远程SparkContext,代码也将在那里执行。同时,client被阻塞,等待此job运行的结果。

Livy提供了三种模式去运行spark job:

  1. Using the Programmatic API,通过程序接口提交作业,需要继承com.cloudera.livy.Job接口编程,通过LivyClient提交
  2. 使用RESTAPI的session接口提交代码段方式运行
  3. 使用TESTAPI的batch接口提交jar包方式运行

配置Livy:

1.修改LIVY_HOME/conf/livy-env.sh,导入环境变量,cdh中的spark

export HADOOP_CONF_DIR=/etc/hadoop/conf

export SPARK_HOME=

/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/

export SPARK_CONF_DIR=

/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/conf

修改LIVY_HOME/conf/livy.conf

livy.server.host = 10.18.0.11

livy.spark.master = yarn

livy.spark.deploy-mode = cluster

从架构图可以看出,当Livy服务器出现故障时,所有连接的Spark集群也会终止,这意味着所有的作业和数据都会立即消失。在Livy0.3.0引入了一个会话恢复机制,以确保在Livy server不可用时Spark集群仍然有效,并且在重新启动Livy server后可以连接到现有的会话,并回滚到失败之前的状态。所以还应该设置以下3种配置:

# 默认:off,Livy server挂掉,所有session都会stop,session状态也会丢失

# recovery :Livy将会话信息持久化state-store。当Livy重新启动时,它会state-store恢复之前的会话。

livy.server.recovery.mode = recovery

# Livy会话状态存储的地方

#默认:<empty>, 不存储

# filesystem:存储到本地文件系统

# zookeeper:存储到zookeeper中

livy.server.recovery.state-store = zookeeper

#文件系统路径或zookeeper的host

livy.server.recovery.state-store.url = zkHost1:2181,zkHost2:2181,zkHost3:2181

二、初始化资源:

在yarn中初始化一个spark应用,持久化的占有一定的资源。有一个sc(SparkContext)和spark(SparkSession),类似于spark-shell,在代码块中直接使用即可。

以下使用python语言,使用REST API方式,其他的方式可以看官网

import requests, json, textwrap, pprint

"""
1.定义spark应用的资源情况
定义一个dict, 更多参数可以去官网或github上查看,类似于spark-submit
如果要导入自定义的模块,要放到hdfs上
"""
data = {"pyFiles":["hdfs://localhost:8020/***/lib.zip"], # list of string
 "executorMemory":"2G",
"executorCores":2,
"numExecutors":3
}
"""
问题:使用pyFiles添加依赖,好像没用,还是报模块找不到。
解决:在代码块中使用 sc.addPyFile(path), 必须在import 依赖的包之前,见下方代码块
"""

#2.发送请求,创建spark应用

livy_host = "http://10.18.0.11:8998"
headers = {"Content-Type":"application/json"}

#创建一个spark应用 POST livy_host/sessions
rp = requests.post(url=livy_host+"/sessions",data=json.dumps(data),headers=headers)

#查看spark应用状态 GET livy_host/sessions/{sessionId}
session_url = livy_host+rp.headers['location']
rp_se = requests.get(url=session_url,headers=headers)
pprint.pprint(rp_se.json())

#3.向spark应用提交代码块

#定义提交的代码块 在代码块中直接使用sc(SparkContext)、spark(SparkSession)
py_code=textwrap.dedent("""
#导入自定义module
#sc.addPyFile(path)
#from lib import *

#官网的例子  计算圆的Pi
import random
    NUM_SAMPLES = 100000
    def sample(p):
      x, y = random.random(), random.random()
      return 1 if x*x + y*y < 1 else 0

    count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)
    print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
""")

body = {
"kind":"pyspark", #定义代码块中代码类型,spark、pyspark、sparkr,sql
"code":py_code
}

# POST livy_host/sessions/{sessionId}/statements
statements_url = session_url+"/statements"

#提交代码块
rp_st = requests.post(url=statements_url,data=json.dumps(body),headers=headers)

#查看程序运行状态 GET livy_host/sessions/{sessionId}/statements/{statementId}
result_url = livy_host+rp_st.headers['location']

flag = True
while flag:
        result = requests.get(url=result_url,headers=headers)
        result = result.json()
        if result['state']=="available":
            flag = False
        print(f"progress:{result['progress']}")
        time.sleep(1)
pprint.pprint(result)

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇SPARK的安装与部署 下一篇ubuntu12.04集群安装Spark

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目