设为首页 加入收藏

TOP

基于Oozie实现MapReduce作业的自动提交功能
2019-05-12 00:14:10 】 浏览:80
Tags:基于 Oozie 实现 MapReduce 作业 自动 提交 功能

Oozie是一个Hadoop工作流管理系统。OozieClient RestFul API官方参考如下。它提供了JAVA API 和 RESTFul API 两种形式使用Oozie客户端可以向Oozie服务端提交workflow。

workflow即工作流,在Oozie中使用workflow来配置各种类型的动作节点,如MapReduce类型的action,这些action就是完成具体功能的程序。而workflow负责组织各个动作节点,形成有向无环图。

对于用户而言,传统的MapReduce程序的提交运行过程如下:

a) 将编写好的模型打包成jar文件并上传到Hadoop集群的HDFS上;

b) 根据具体的作业配置好程序的输入目录和输出目录;

c) 使用hadoop jar 命令提交计算作业。

采用Oozie之后,可以借助Oozie来负责命令的提交。下面介绍使用Oozie RESTFul API 提交作业。

使用Oozie提交作业的流程如下:

a) 用户先提前将作业jar包以及相应的workflow.xml配置文件上传到HDFS上的合适目录中

b)运行ooziejob -oozie ..... -run 命令或者通过编程实现Oozie RestFul API提交作业。

Oozie的提交原理简介

当Oozie Server接收到用户的提交请求后,Oozie Server解析本次作业的workflow.xml文件,执行工作流,并启动一个特殊的mapreduce作业(没有reducer),由这个特殊的mpareduce作业负责真正地提交Hadoop作业执行(如MapReduce的wordcount)。正是由于Oozie Server通过启动一个特殊的MR作业来向Hadoop集群提交作业,事先是不知道这个特殊的MapReduce作业运行在哪个机器上的,因此这也是为什么需要提前将写好的程序jar文件以及配置文件workflow.xml先上传到HDFS上的原因。


作业提交的生产者--消费者模型

当有许多作业进行提交时,可以使用生产者--消费者模型来管理这些作业。

本生产者--消费者的设计思路如下:

当向Oozie Server提交一个作业之后,会返回一个jobId,生产者负责将jobId添加到链表中,交将作业的一些基本的静态信息保存到数据库中;而消费者则从链表中取出jobId,并根据此jobId来查询本次作业的执行时间、执行状态等信息。以此为基础,可以统计出所有向Oozie Server提交的作业。

提交系统提供了两种方式URI给用户进行作业提交:一种是Servelet实现,一种是WebService实现。

用户只需要通过发送HTTP请求,携带用户名和AppPath(提前配置好的上传到HDFS中的作业lib文件夹所在目录)两个参数,就可以完成作业的提交。用户名可以进行一定的权限验证,而AppPath则是告诉Oozie Server待执行的作业在哪里,Oozie Server根据AppPath找到作业,生成工作流,然后将作业提交给Hadoop计算,并监控作业直到完成。

作业提交的部分代码如下:

String jsonJobId = oozieClient.submitJob(userName, appPath);//submit job to oozie server
jobId = Parse.getJobId(jsonJobId);//get job id
System.out.println("job " + jobId + " submited at " + new Date());
		
//start producer thread
new Thread(new AddJob(this)).start();
//start consumer thread
new Thread(new JobStatistic(oozieClient)).start(); 

由于每一个作业都有一个唯一的作业ID,故生产者线程先将本次作业提交相关信息保存到数据库中,然后再将作业ID添加到生产者-消费者队列中(一个LinkedList)。生产者的部分代码如下:

String jobId = schedulerAction.getJobId();
String userName = schedulerAction.getUserName();
String appPath = schedulerAction.getAppPath();
long inputSize = CommonUtil.getFileSizeWithSplitPath(appPath, userName);
try{
	db.insertJobProperty(userName, appPath, inputSize, jobId);
	System.out.println("insert into mysql success");
}catch(SQLException e){
		e.printStackTrace();
		System.out.println("SQL Exeception");
	}
	//after producer insert job property into mysql, add jobId to LinkedList.so consumer always update successful.
	addId(jobId);//as producer thread add jobId in LinkedList.
其中inputSize是本次作业运行时所处理的输入大小。这可以通过WebHDFS访问输入目录获得。

消费者负责从LinkedList中取出jobId,并根据此jobId向Oozie Server发送HTTP请求查询当前作业的执行状态以及作业执行完成后所花的时间。消费者部分代码如下:

String jobId = null;
	synchronized (jobIds) {
		try {
			if (jobIds.size() == 0)
				jobIds.wait();//LinkedList is empty
			jobId = jobIds.removeLast();
			System.out.println("remove a job from LinkedList");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}// end sync

try {
		db.updateRunTime(runTime, jobId);
		System.out.println("update job run Time success");
	} catch (SQLException e) {
		e.printStackTrace();
	}

Client向Oozie Server发送HTTP请求通过 Apache HTTP Client 包来实现。

需要改进的地方:

1,每来一个提交请求时,就会启动二个线程,一个是生产者线程另一个是消费者线程。而对于消费者线程而言,可以预先启动起来,作为后台线程一直在运行,负责查询作业的执行情况。而不是一次请求就启动一个消费者线程。

2,一个提交请求会触发两次写数据库。一次发生在生产者将作业提交的相关信息写入数据库,一次发生在消费者将作业运行相关信息写入数据库。可以先将这些信息缓存在内存中,当提交的请求次数达到一定程度时,再统一写入数据库。从而减少访问数据库的次数。







】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hadoop之 HDFS通信协议 与 HDFS体.. 下一篇大数据之_Hadoop工作笔记002---Sp..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目