设为首页 加入收藏

TOP

Python 写的Hadoop小程序
2014-11-24 11:28:01 来源: 作者: 【 】 浏览:0
Tags:Python Hadoop 程序

该程序是在python2.3上完成的,python版本间有差异。


Mapper:


import sys


line_number = 0
tab_number = 0
pv_number = 0
clk_number = 0
if_compressed_tested = 0
if_compressed = 0


#functions:
def compressed_stat(line):
global line_number
global tab_number
global pv_number
global clk_number
try:
line_number += 1
line_split_list = line.split("\t")
line_split_list_size = len(line_split_list)
tab_number += (line_split_list_size - 1)
index = 1
while index < line_split_list_size:
pv_clk_list = line_split_list[index].strip().split(" ")
pv_number += int(pv_clk_list[0])
clk_number += int(pv_clk_list[1])
index += 1
except ValueError:
print line,"\tERROR"



def before_compress_stat(line):
global line_number
global pv_number
global clk_number
try:
line_number += 1
line = line.strip()
line_split_list = line.split(" ")
pv_number += int(line_split_list[0])
clk_number += int(line_split_list[1])
except ValueError:
print line,"\tERROR"
#end functions


for line in sys.stdin:
try:
line = line.strip()
if if_compressed_tested == 0:
if_compressed_tested = 1
if line.find("\t") > 0:
if_compressed = 1
if if_compressed == 0:
before_compress_stat(line)
else:
compressed_stat(line)
except ValueError:
pass
if if_compressed == 1:
print ("%ld %ld %ld %ld"%(line_number, tab_number, pv_number,clk_number))
else:


print ("%ld %ld %ld"%(line_number,pv_number,clk_number))


Reducer:
import sys



line_number = 0
tab_number = 0
pv_number = 0
clk_number = 0
if_compressed_tested = 0
if_compressed = 0


def compressed_stat(line):
global line_number
global tab_number
global pv_number
global clk_number
pv_clk_list = line.split(" ")
if len(pv_clk_list) != 4:
print line,"\tERROR"
else:
line_number += int(pv_clk_list[0])
tab_number += int(pv_clk_list[1])
pv_number += int(pv_clk_list[2])
clk_number += int(pv_clk_list[3])


def before_compress_stat(line):
global line_number
global pv_number
global clk_number
pv_clk_list = line.split(" ")
if len(pv_clk_list) != 3:
print line,"\tERROR"
else:
line_number += int(pv_clk_list[0])
pv_number += int(pv_clk_list[1])
clk_number += int(pv_clk_list[2])
#


for line in sys.stdin:
try:
line = line.strip()
if line.count("ERROR") > 0:
print line
continue

if if_compressed_tested == 0:
if_compressed_tested = 1
if len(line.split(" ")) == 4:
if_compressed = 1
elif len(line.split(" ")) == 3:
if_compressed = 0
else:
print line,"\tERROR"
continue

if if_compressed == 0:
before_compress_stat(line)
else:
compressed_stat(line)
except ValueError:
print line, "\tERROR"
pass

if if_compressed == 0:
print "LINE_NUMBER:",line_number,"TOTAL_PV_NUMBER:",pv_number, "TOTAL_CLK_NUMBER:",clk_number
else:
print "LINE_NUMBER:",line_number,"TAB_NUMBER",tab_number,"TOTAL_PV_NUMBER:",pv_number, "TOTAL_CLK_NUMBER:",clk_number


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇Android中AlertDialog的使用方法 下一篇Shell脚本中的递归函数以及遍历目..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

·C 内存管理 | 菜鸟教 (2025-12-26 20:20:37)
·如何在 C 语言函数中 (2025-12-26 20:20:34)
·国际音标 [ç] (2025-12-26 20:20:31)
·微服务 Spring Boot (2025-12-26 18:20:10)
·如何调整 Redis 内存 (2025-12-26 18:20:07)