设为首页 加入收藏

TOP

Hadoop Streaming的一些基本知识
2019-02-09 00:42:45 】 浏览:53
Tags:Hadoop Streaming 一些 基本知识

Streaming工作原理:

在上面的例子里,mapper和reducer都是可执行文件,它们从标准输入读入数据(一行一行读), 并把计算结果发给标准输出。Streaming工具会创建一个Map/Reduce作业, 并把它发送给合适的集群,同时监视这个作业的整个执行过程

如 果一个可执行文件被用于mapper,则在mapper初始化时, 每一个mapper任务会把这个可执行文件作为一个单独的进程启动。 mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。 如果没有tab,整行作为key值,value值为null。不过,这可以定制,在下文中将会讨论如何自定义key和value的切分方式。

如果一个可执行文件被用于reducer,每个reducer任务会把这个可执行文件作为一个单独的进程启动。 Reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。在下文中将会讨论如何自定义key和value的切分方式。

这是Map/Reduce框架和streaming mapper/reducer之间的基本通信协议。

用户也可以使用java类作为mapper或者reducer。上面的例子与这里的代码等价:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer /bin/wc

任何可执行文件都可以被指定为mapper/reducer。这些可执行文件不需要事先存放在集群上; 如果在集群上还没有,则需要用-file选项让framework把可执行文件作为作业的一部分,一起打包提交。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper myPythonScript.py \
-reducer /bin/wc \
-filemyPythonScript.py

Hadoop streaming框架默认情况下会以'\t'作为分隔符,将每行第一个'\t'之前的部分作为key,其余内容作为value,如果没有'\t'分隔 符,则整行作为key;这个key\tvalue对又作为reduce的输入。hadoop 提供配置供用户自主设置分隔符。
-D stream.map.output.field.separator \#设置map输出中key和value的分隔符
-D stream.num.map.output.key.fields \#设置map程序分隔符的位置,该位置之前的部分作为key,之后的部分作为value
-D map.output.key.field.separator \#设置map输出中key内部的分割符
-D num.key.fields.for.partition\#指定分桶时,key按照分隔符切割后,其中用于分桶key所占的列数(配合-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 使用)
-D stream.reduce.output.field.separator \#设置reduce输出中key和value的分隔符
-D stream.num.reduce.output.key.fields #设置reduce程序分隔符的位置

一个实用的Partitioner类

(二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 选项)

Hadoop 有一个工具类org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner, 它在应用程序中很有用。Map/reduce框架用这个类切分map的输出, 切分是基于key值的前缀,而不是整个key。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
    -jobconf stream.map.output.field.separator=. \
    -jobconf stream.num.map.output.key.fields=4 \
    -jobconf map.output.key.field.separator=. \
    -jobconf num.key.fields.for.partition=2 \
    -jobconf mapred.reduce.tasks=12

其中,-jobconf stream.map.output.field.separator=.-jobconf stream.num.map.output.key.fields=4是前文中的例子。Streaming用这两个变量来得到mapper的key/value对。

上面的Map/Reduce 作业中map输出的key一般是由“.”分割成的四块。但是因为使用了-jobconf num.key.fields.for.partition=2选项,所以Map/Reduce框架使用key的前两块来切分map的输出。其中,-jobconf map.output.key.field.separator=.指定了这次切分使用的key的分隔符。这样可以保证在所有key/value对中, key值前两个块值相同的所有key被分到一组,分配给一个reducer。

这种高效的方法等价于指定前两块作为主键,后两块作为副键。 主键用于切分块,主键和副键的组合用于排序


本地单机调试:

cat tt | sh mapper.sh | sort -k1 | sh reducer.sh
其中tt为初始map输入文件,sort -k1 指定partition的key field=1,如果指定的key field=2可以使用sort -k1,1 -k2,2

常见错误:

1、streaming默认的情况下,mapper和reducer的返回值不是0,被认为异常任务,将被再次执行,默认尝试4次都不是0,整个job都将失败

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1

at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)

at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)

at org.apache.hadoop.streaming.PipeReducer.reduce(PipeReducer.java:130)

at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519)

at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)

at org.apache.hadoop.mapred.Child$4.run(Child.java:255)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:396)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)

at org.apache.hadoop.mapred.Child.main(Child.java:249)

用户可以设定stream.non.zero.exit.is.failure true 或false 来表明streaming task的返回值非零时是 Failure 还是Success。默认情况,streaming task返回非零时表示失败。

解决方法:
hadoopjarhadoop-streaming*.jar
-Dstream.non.zero.exit.is.failure=false

2、在执行streaming任务时,出现:Environment variable CLASSPATH not set!
解决方法:
在执行streaming时,加上选项:
-cmdenvCLASSPATH=$CLASSPATH

3、在调用HDFS的C接口时,出现:Call to JNI_CreateJavaVM failed with error: -1

原因:貌似是因为在编译的时候加上了libjvm,然后动态链接库那也加了。解决方法,编译的时候去掉libjvmm的链接就可以了

4、 -file mapper.sh -file reducer.sh都添加了,但streaming运行到一半就不动了
检查 mapper、reducer 脚本中是否使用/读取了其他文件,需要将这些文件也通过 -file 加入到streaming中

5、注意分行连接符后面不要留空格


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hadoop streaming 参数设置 下一篇Hadoop提交作业------>hadoop..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目