Hadoop-Pig常用函数介绍
1.Pig简介:
Pig在这个地方并不是指“猪”(我也不知道这个开发者的命名是什么想法),是指的一种数据处理工具,常用于配合Hadoop使用,用于处理大数据的分析与处理(数据批处理)。
因为Pig有一套专属的语法(与MySQL类似),所以相比于MapReduce来说,更适合做一些复杂度不高的数据筛选处理工作,只需要简单几行命令就可以获得可能几百行MapReduce代码带来的收益。
2.Pig表达式
类别
描述
示例
常数
就类似于编程语言中的常量(字符串,整数等等)
“abc”, 1, 1.5
属性(名字指定)
取一个集合的属性(类似取MySQL数据某一行的某一个字段)
.name
属性(位置指定)
取一个集合的属性(按照顺序取某一行)
.$1
属性(消除歧义)
在JOIN之后属性可能会有歧义,使用该方式取属性
::name
类型转换
强制类型转换
float(“1.22”)
算术
数学运算
x+y,x-y,x*y,x/y,x%y
三木运算符
参考编程语言三目运算符
(x == 1)x:0
比对
逻辑比较符号
==,!=,>,<,>=,<=,matches,is,is not
逻辑连接
参考MySQL的逻辑连接
AND OR NOT
3.Pig数据类型
类别
类型
描述
示例
数值
int
32位有符号整数
1
数值
long
64位有符号证书
1L
数值
float
32位浮点数
1.0F
数值
double
64位浮点数
1.0
字符串
chararray
UTF-16格式字符串
“abcdefg”
二进制流
bytearray
字符数组
结构类型
tuple
参考MySQL的一行数据(元组)
(1, 2, “abc”)
结构类型
bag
参考MySQL的多行数据
{(1,2),(3,4)}
结构类型
map
类似json(键必须是字符串)
[‘xingming’#’luoxuan’]
4.Pig常用函数
数据准备:
test_input 内容如下(各字段之间默认以\t分割 ):
luoxuan 170 150
gongmengnan 160 130
chenxiaolong 165 150
luoxuan 175 150
test_score内容如下(各字段之间默认以\t分割 ):
luoxuan 100 100 90
gongmengnan 40 50 80
chenxiaolong 80 80 70
4.1加载函数
LOAD AS : 加载数据并按照格式进行封装,传递值到变量中
Exp:
A = LOAD 'test_input' AS (name: chararray, height: int , weight: double );
4.2输出函数
4.2.1 DUMP:
输出变量内容
Exp:
DUMP A
结果如下:
(luoxuan , 170 , 150 )
(gongmengnan , 160 , 130 )
(chenxiaolong , 165 , 150 )
(luoxuan , 175 , 150 )
4.2.2 STORE INTO:
将内容输出到文件里面
Exp:
STORE C INTO 'output' ;
4.2.3 DESCRIBE:
查看变量数据结构
Exp:
DESCRIBE A
A : {name : chararray,height: int,weight: int }
4.2.4 ILLUSTRATE:
查看示例(随机选取一条记录进行查看)
Exp:
ILLUSTRATE A
A
name:chararray
height:int
weight:int
gongmengnan
160
130
pig会在当前目录下新建output(必须之前不存在 )文件夹,把结果放到文件夹下面
4.3数组操作函数
4.3.1 FOREACH GENERATE:
遍历数组每一项并获取对应字段(类似select a, b, c from table )
Exp:
FOREACH A GENERATE name;
luoxuan
gongmengnan
chenxiaolong
luoxuan
4.3.2 JOIN:
连接两个包数据(参考MySQL INNER JOIN,必须数据类型相同才能作为连接的字段 )
Exp:
A = LOAD '/user/hadoop/test_pig_data/test_input' AS (name:chararray, height:int , weight:int );
B = LOAD '/user/hadoop/test_pig_data/test_score' AS (name:chararray, math_score:int , english_score:int , physical_score:int );
C = JOIN A BY name, B BY name;
DUMP C;
(luoxuan ,175 ,150 ,luoxuan,100 ,100 )
(luoxuan ,170 ,150 ,luoxuan,100 ,100 )
(gongmengnan ,160 ,130 ,gongmengnan,40 ,50 )
(chenxiaolong ,165 ,150 ,chenxiaolong,80 ,80 )
4.3.3 RIGHT OUTER:
右连接(参考MySQL RIGHT OUTER JOIN,必须数据类型相同才能作为连接的字段 )
Exp:
C = JOIN A BY name RIGHT OUTER, B BY name;
DUMP C;
(luoxuan,175 ,150 ,luoxuan,100 ,100 )
(luoxuan,170 ,150 ,luoxuan,100 ,100 )
(,,,wangqiang,100 ,60 )
(gongmengnan,160 ,130 ,gongmengnan,40 ,50 )
(chenxiaolong,165 ,150 ,chenxiaolong,80 ,80 )
4.3.4 LEFT OUTER:
左连接(参考MySQL LEFT OUTER JOIN,必须数据类型相同才能作为连接的字段 )
Exp:
在test_input文件中新增一行,使其与test_score数据不一致:
liudehua 160 160
C = JOIN A BY name LEFT OUTER, B BY name;
DUMP C;
(luoxuan ,175 ,150 ,luoxuan,100 ,100 )
(luoxuan ,170 ,150 ,luoxuan,100 ,100 )
(liudehua ,160 ,160 ,,,)
(gongmengnan ,160 ,130 ,gongmengnan,40 ,50 )
(chenxiaolong ,165 ,150 ,chenxiaolong,80 ,80 )
4.3.5 FULL OUTER:
满连接(参考MySQL FULL OUTER JOIN,必须数据类型相同才能作为连接的字段 )
C = JOIN A BY name FULL OUTER, B BY name;
DUMP C;
(luoxuan,175 ,150 ,luoxuan,100 ,100 )
(luoxuan,170 ,150 ,luoxuan,100 ,100 )
(liudehua,160 ,160 ,,,)
(,,,wangqiang,100 ,60 )
(gongmengnan,160 ,130 ,gongmengnan,40 ,50 )
(chenxiaolong,165 ,150 ,chenxiaolong,80 ,80 )
4.3.6 UNION:
合并数组(参考MySQL的UNION)
Exp:
A = LOAD '/user/hadoop/test_pig_data/test_input' AS (name:chararray, height:int , weight:int );
B = LOAD '/user/hadoop/test_pig_data/test_score' AS (name:chararray, math_score:int , english_score:int , physical_score:int );
C = UNION A, B;
DUMP C;
(luoxuan ,100 ,100 ,90 )
(gongmengnan ,40 ,50 ,80 )
(chenxiaolong ,80 ,80 ,70 )
(wangqiang ,100 ,60 ,60 )
(luoxuan ,170 ,150 )
(gongmengnan ,160 ,130 )
(chenxiaolong ,165 ,150 )
(luoxuan ,175 ,150 )
4.3.7 LIMIT:
返回数据限制(并不限制从文件系统读的数据量,都是全部读取,这一点和MySQL Limit不同 )
Exp:
E = LIMIT D 3 ;
DUMP E ;
(7 )
(11 )
(12 )
4.3.8 ORDER:
排序(参考MySQL ORDER BY, null比任何值都小,排在最前面 )
Exp:
ORDER A BY name (ASC |DESC ,默认ASC 升序);
(,,)
(chenxiaolong,165,150)
(gongmengnan,160,130)
(liudehua,160,160)
(luoxuan,175,150)
(luoxuan,170,150)
4.4 过滤函数
4.4.1 FILTER:
根据条件过滤数据(参考MySQL的WHERE)
Exp:
C = FILTER A BY name=='luoxuan' ;
DUMP C ;
(luoxuan ,170 ,150 )
(luoxuan ,175 ,150 )
4.4.2 matches:
用于进行正则匹配(引号中\代表转义,\\代表\,所以正常正则中的\w翻译到pig中就变成\\w )
Exp:
C = FILTER A BY name matches '\\w+ong\\w*' ;
DUMP C ;
(gongmengnan ,160 ,130 )
(chenxiaolong ,165 ,150 )
4.5 数据操作函数
4.5.1 SIZE:
求长度(包的长度,字符串的长度,这个地方也用到了三目表达式,和C,C++ 中的三目一样 )
Exp:
D = FOREACH A GENERATE ((name is not null )SIZE(name):0 );
DUMP D;
(7 )
(11 )
(12 )
(7 )
(8 )
4.5.2 SUBSTRING:
截取字符串(参考MySQL SUBSTR,第一个参数是被截取的字符串,第二个参数是截取开始索引,第三个参数是截取结束索引 )
Exp:
C = FOREACH A GENERATE SUBSTRING(name, 0 , 4 ), height, weight;
DUMP C ;
(luox ,170 ,150 )
(gong ,160 ,130 )
(chen ,165 ,150 )
(luox ,175 ,150 )
4.5.3 CONCAT:
连接两个字符串(连接多个时用嵌套CONCAT(C, CONCAT(A, B)) )
Exp:
C = FOREACH A GENERATE CONCAT(name, 'hehe' ), height, weight;
DUMP C ;
(luoxuanhehe ,170 ,150 )
(gongmengnanhehe ,160 ,130 )
(chenxiaolonghehe ,165 ,150 )
(luoxuanhehe ,175 ,150 )
4.6聚合函数
4.6.1 GROUP:
数据分组
Exp:
B = GROUP A BY name;
DUMP B;
(luoxuan , {(170 , 150 ) , (175 , 150 ) }, chenxiaolong, {(165 , 150 ) }, gongmengnan, {(160 , 130 ) })
4.6.2 COUNT:
对列表计数
Exp:
D = FOREACH B GENERATE group, COUNT (A);
DUMP D ;
(luoxuan ,2 )
(gongmengnan ,1 )
(chenxiaolong ,1 )
4.6.3 SUM:
用于求和(参考MySQL SUM)
A = LOAD '/user/hadoop/test_pig_data/test_input' AS (name:chararray, height:int , weight:int );
B = LOAD '/user/hadoop/test_pig_data/test_score' AS (name:chararray, math_score:int , english_score:int );
C = UNION A, B;
D = GROUP C BY $0;
E = FOREACH D GENERATE group, SUM(C.$1);
DUMP E;
(luoxuan ,445 )
(wangqiang ,100 )
(gongmengnan ,200 )
(chenxiaolong ,245 )
4.6.4 COGROUP:
COGROUP 是用于多个包之间关联分组的,实质上是先通过用来分组的字段做了一次outer join(参考mysql),然后将连接后的数据进行分组查看。
A = LOAD '../test_pig_data/test_input' AS (name:chararray, height:int , weight:int );
B = LOAD '../test_pig_data/test_score' AS (name:chararray, math_score:int , english_score:int );
C = COGROUP A BY name, B BY name;
DUMP C;
(luoxuan ,{(luoxuan ,170,150) ,(luoxuan ,175,150) } ,{(luoxuan ,100,100) } )
(wangqiang ,{} ,{(wangqiang ,100,60) } )
(gongmengnan ,{(gongmengnan ,160,130) } ,{(gongmengnan ,40,50) } )
(chenxiaolong ,{(chenxiaolong ,165,150) } ,{(chenxiaolong ,80,80) } )
4.7 其他函数
4.7.1 STREAM THROUGH:
调用SHELL语句(会把数据每一行用来作为shell的参数)
Exp:
D = STREAM A THROUGH `awk ' {print $1;}'`;
DUMP D;
(luoxuan )
(gongmengnan )
(chenxiaolong )
(luoxuan )
4.7.2 FLATTEN:
解包 参考python 的*操作符
将(1, 2, 3)转换成1, 2, 3
5.常见问题
5.1 如何删除pig运行日志。
Pig运行时会在当前目录生成日志,如何一键删除日志:
ll | grep pig_ .*\ .log | awk '{print $9}' | rm -rf
5.2 Pig连不上Hadoop
ERROR 4010: Cannot find hadoop configurations in classpath (neither hadoop-site.xml nor core-site.xml was found in the classpath). If you plan to use local mode, please put -x local option in command line:
编辑profile文件
vi /etc/profile
添加环境变量
export HADOOP_HOME=/usr/hadoop/hadoop-2.6 .4
export PIG_CLASSPATH=$HADOOP_HOME /conf
export PATH="$HADOOP_HOME /bin:$PIG_CLASSPATH :$PATH "
5.3 Pig运行不够快:
尝试将Pig运行优先级设置为高,在脚本最前面设置下面这一行。
set job.priority HIGH;
5.4 设置Pig名称,在Hadoop作业监控页面查看数据:
在脚本最前面设置下面这一行。
set job.name 'hehe job' ;
6. 参考资料
《HADOOP权威指南(第三版)》