设为首页 加入收藏

TOP

spark-job-server
2018-12-06 17:26:45 】 浏览:117
Tags:spark-job-server
spark rest api包括:

spark运行在yarn上,查看运行进度:
>yarn application -list | grep SPARK
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_1468582953415_1281732 Spark shell SPARK recsys root.bdp_jmart_recsys.recsys_szda RUNNING UNDEFINED10% http://172.18.149.130:36803
application_1468582953415_1275117 sparksql SPARK mart_risk root.bdp_jmart_risk.bdp_jmart_risk_hkh RUNNING UNDEFINED 10% http://172.18.143.152:5396
> yarn application -status application_1468582953415_1275117 查看一个任务的状态
yarn收集日志: yarn logs -applicationId <application ID>
可以收集应用程序的运行日志,但是必须应用程序运行完才能查看(运行完yarn才聚合日志),同时必须开启日志聚合功能(默认是不开启的),修改yarn.log-aggregation-enable为true.

Spark job server

部署模式
总共有4种部署模式,部署到集群有两种方式:
Deploy job server to a cluster. There are two alternatives (see thedeployment section):
  • server_deploy.shdeploys job server to a directory on a remote host.
  • server_package.shdeploys job server to a local directory, from which you can deploy the directory, or create a .tar.gz for Mesos orYARN deployment.

spark on yarn 部署
部署
现在spark-job-server source code到本地,解压,进入解压后的目录:
  1. 拷贝conf/local.sh.template文件到local.sh。备注:如果需要编译不同版本的Spark,请修改SPARK_VERSION属性。
conf现在是config,并且它是一个软链接,指向job-server/config
  1. 拷贝config/shiro.ini.template文件到shiro.ini。备注: 仅需authentication = on时执行这一步。
现在没有shiro.ini.template
  1. 拷贝config/local.conf.template<environment>.conf
enviroment这里为local,即local.conf,本地模式运行spark;可以是yarn。enviroment中有webui和相关的环境变量信息。
  1. bin/server_package.sh <environment>,这一步将job-server以及配置文件打包,并一同推送到配置的远程服务器上。
  2. 在远程服务器上部署的文件目录下通过执行server_start.sh启动服务,如需关闭服务可执行server_stop.sh
(注意执行server_start.sh的用户默认情况下为spark-job-server提交任务到yarn的用户)
查看spark-job-server是否在运行:
ps -ef | grep spark-job-server

调用

打包
mvn clean package -DSkipTests -Pproduct
上传jar包
curl --data-binary @target/recommendation-model-1.0-SNAPSHOT.jar http://bds-test-004:8090/jars/test
执行代码
curl -d "source=spark_test.wangxiang_fact_user_product_rating" 'http://bds-test-004:8090/jobsappName=test&classPath=com.jd.jddp.dm.model.ALSRecommend'
curl -d "source=spark_test.movielens,save_to=d_0e276318a87ced54171884ed765e9962.t_8dfe9c53a6cae3d5356984f799f0d685,rec_num=6,columns=user;product;rating" 'http://bds-test-004:8090/jobsappName=test&classPath=com.jd.jddp.dm.model.ColdStart'
查看执行状态
查看webui

Contexts
参考:
默认情况下,每次提交spark任务,如果没有传递context参数,jobserver会新建一个sparkContext,配置为yarn.conf中的配置。
查看api可以自定义一个context,提交的时候可以指定使用该context:
GET /contexts - lists all current contexts
POST /contexts/<name> - creates a new context
DELETE /contexts/<name> - stops a context and all jobs running in it
PUT /contextsreset=reboot - kills all contexts and re-loads only the contexts from config
>curl -X POST "http://bds-test-004:8090/contexts/test1"
提交的时候指定context:
curl -d "source=spark_test.movielens,save_to=test.test123,rec_num=6,columns=user;product;rating" 'http://bds-test-004:8090/jobsappName=baseAlgo&classPath=com.jd.jddp.dm.model.ALSRecommend&context=test-context2'

执行结果:
curl -d "source=spark_test.wangxiang_fact_user_product_rating,save_to=dtest.test234,rec_num=6,columns=userid;product_id;rating" 'http://bds-test-004:8090/jobsappName=test&classPath=com.jd.jddp.dm.model.ColdStart'
{
"status": "STARTED",
"result": {
"jobId": "369670b9-ad45-454e-aeac-52e1265cf889",
"context": "feeaa22e-com.jd.jddp.dm.model.ColdStart"
}
}

需要修改的配置
settings.sh
路径配置
INSTALL_DIR=/export/App/job-server
SPARK_HOME=/export/App/spark-1.6.0-bin-hadoop2.6.1

yarn.conf
yarn.conf配置了spark运行模式,jobserver,以及SparkContext默认设置

yarn.conf:
spark {
# spark.master will be passed to each job's JobContext
#master = "local[4]"
# master = "mesos://vm28-hulk-pub:5050"
# master = "yarn-client"
master = "yarn-client"

# Default # of CPUs for jobs to use for Spark standalone cluster
#job-number-cpus = 4

jobserver {
port = 8090
jar-store-rootdir = /tmp/jobserver/jars
context-per-jvm = true
jobdao = spark.jobserver.io.JobFileDAO
filedao {
rootdir = /tmp/spark-job-server/filedao/data
}
}
context-settings {
num-cpu-cores = 2 # Number of cores to allocate. Required.
memory-per-node = 2G # Executor memory per node, -Xmx style eg 512m, #1G, etc.

spark.executor.instances = 4
spark.cassandra.connection.host = 192.168.177.79
spark.cassandra.auth.username = test
spark.cassandra.auth.password = test123
spark.cleaner.ttl = 3600
# in case spark distribution should be accessed from HDFS (as opposed to being installed on every mesos slave)
# spark.executor.uri = "hdfs://namenode:8020/apps/spark/spark.tgz"

# uris of jars to be loaded into the classpath for this context. Uris is a string list, or a string separated by commas ','
# dependent-jar-uris = ["file:///some/path/present/in/each/mesos/slave/somepackage.jar"]

# If you wish to pass any settings directly to the sparkConf as-is, add them here in passthrough,
# such as hadoop connection settings that don't use the "spark." prefix
passthrough {
#es.nodes = "192.1.1.1"
}
}

# This needs to match SPARK_HOME for cluster SparkContexts to be created successfully
# home = "/home/spark/spark"
}

# Note that you can use this file to define settings not only for job server,
# but for your Spark jobs as well. Spark job configuration merges with this configuration file as defaults.

akka {
remote.netty.tcp {
# This controls the maximum message size, including job results, that can be sent
# maximum-frame-size = 10 MiB
}
}

spark代码中需要修改的地方:
参考源码中的test代码:
object WordCountExampleNewApi extends NewSparkJob {
type JobData = Seq[String]
type JobOutput = collection.Map[String, Long]

def runJob(sc: SparkContext, runtime: JobEnvironment, data: JobData): JobOutput =
sc.parallelize(data).countByValue

def validate(sc: SparkContext, runtime: JobEnvironment, config: Config):
JobData Or Every[ValidationProblem] = {
Try(config.getString("input.string").split(" ").toSeq)
.map(words => Good(words))
.getOrElse(Bad(One(SingleProblem("No input.string param"))))
}
}

API



spark-job-server是使用sbt来编译的
sbt下载:http://www.scala-sbt.org/download.html (ubuntu用户可以直接使用apt-get输入命令行进行安装)
Spark-job-server使用的是AKKA框架, 处理rest api请求的代码在WebApi.scala中,获取job信息的函数:
defjobRoutes:Route= pathPrefix("jobs") {
// GET /jobs/<jobId>

获取job信息的类JobStatusActor.scala

build过程在Build.scala
修改scala版本:
scalaVersion := sys.env.getOrElse("SCALA_VERSION", "2.11.8"),
此外,scala版本在local.sh等sh中也要相应的修改。
spark版本在Versions.scala。

编译代码执行:
./server_package.sh local


spark 使用scala2.11编译后,打包会报 akka包找不到的错。






















】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇如何使用Spark SQL 的JDBC server 下一篇【Spark内核源码】Spark基本概念..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目