设为首页 加入收藏

TOP

Flume RegexHbaseEventSerializer自定义rowKey
2018-11-29 10:51:33 】 浏览:59
Tags:Flume RegexHbaseEventSerializer 定义 rowKey
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/tom_fans/article/details/80829962
上篇Flume谈到setwritewal出错的问题,通过注释了3行代码。但是由于rowkey默认是自动产生的,产生的规则通过源代码可以看出,规则是:
String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());

如果要自定义rowkey,修改源代码是唯一的办法,RegexHbaseEventSerializer.java就是我们要修改的文件。我们可以新建一个类来继承,原始文件不要去修改。不过今天我测试的时候是直接修改源文件的。

我的需要的rowkey是:B13612145#1529637655#3#1530085147015#54

String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());

前3个字段都在文件名中,这就意味着,我必须解析文件名获得这3个字段,那么配置文件必须添加header:

a1.sources.r1.type = spooldir  
a1.sources.r1.spoolDir = /data/flume/r1/data
a1.sources.r1.batchSize = 100
#a1.sources.r1.fileHeader = true
a1.sources.r1.basenameHeader = true
a1.sources.r1.channels = c1  

OK,来整理一下思路,我需要解析文件名来获取3个字段作为rowkey组成部分,那么配置需要添加header,然后把源代码自动生成rowkey的规则替换成我们自己的规则,就是这么简单。

1. 新建解析文件名的方法:

	public String splitFileName() {
		for (Map.Entry<String, String> entry : headers.entrySet()) {
			return entry.getValue();
		}

		return null;
	}

既然是要解析文件名,很显然要知道怎么获取文件名,从代码可以知道headers.entrySet就是获取header的方法,因为我就一个header,所以一次循环就return结果。

2. 替换默认rowkey生成规则

	protected byte[] getRowKey(Calendar cal) {
		/*
		 * NOTE: This key generation strategy has the following properties:
		 * 
		 * 1) Within a single JVM, the same row key will never be duplicated. 2)
		 * Amongst any two JVM's operating at different time periods (according
		 * to their respective clocks), the same row key will never be
		 * duplicated. 3) Amongst any two JVM's operating concurrently
		 * (according to their respective clocks), the odds of duplicating a
		 * row-key are non-zero but infinitesimal. This would require
		 * simultaneous collision in (a) the timestamp (b) the respective nonce
		 * and (c) the random string. The string is necessary since (a) and (b)
		 * could collide if a fleet of Flume agents are restarted in tandem.
		 * 
		 * Row-key uniqueness is important because conflicting row-keys will
		 * cause data loss.
		 */

		this.fileName = splitFileName();
		this.machineNo = fileName.split("_")[1];
		this.fileTimeStamp = fileName.split("_")[2];
		this.fileNo = fileName.split("_")[3].split("\\.")[0];
		String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());

		//String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());
		return rowKey.getBytes(charset);
	}

注释掉的那行就是默认的规则,新的是我自己要的规则。

就这样完成了,打个包替换之前的包,消费一个文件来测试,结果正如我们所期望的:

 B13612145#1529637655#3#1530085147016#55   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                
 B13612145#1529637655#3#1530085147016#55   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                
 B13612145#1529637655#3#1530085147016#55   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,7,93,0                                                            
 B13612145#1529637655#3#1530085147016#55   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 
 B13612145#1529637655#3#1530085147016#55   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     
 B13612145#1529637655#3#1530085147017#56   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                
 B13612145#1529637655#3#1530085147017#56   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                
 B13612145#1529637655#3#1530085147017#56   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,11,92,0                                                           
 B13612145#1529637655#3#1530085147017#56   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 
 B13612145#1529637655#3#1530085147017#56   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     
 B13612145#1529637655#3#1530085147018#57   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                
 B13612145#1529637655#3#1530085147018#57   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                
 B13612145#1529637655#3#1530085147018#57   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,7,93,0                                                            
 B13612145#1529637655#3#1530085147018#57   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 
 B13612145#1529637655#3#1530085147018#57   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     
 B13612145#1529637655#3#1530085147018#58   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                
 B13612145#1529637655#3#1530085147018#58   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                
 B13612145#1529637655#3#1530085147018#58   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=5,8,93,0                                                            
 B13612145#1529637655#3#1530085147018#58   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 
 B13612145#1529637655#3#1530085147018#58   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     
 B13612145#1529637655#3#1530085147019#59   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                
 B13612145#1529637655#3#1530085147019#59   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                
 B13612145#1529637655#3#1530085147019#59   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,8,93,0                                                            
 B13612145#1529637655#3#1530085147019#59   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 
 B13612145#1529637655#3#1530085147019#59   column=cf:ext_toolno, timestamp=1530085147229, value=30         


今天测试的时候碰到2个问题:

1. 消费文件有几次出现文件名已经修改为.COMPLETE,但是我HBASE数据没有任何增加,而且没有报任何错误,。给我的感觉就是没有消费。测试了几次,都是如此,很是困惑,后来突然想起来之前有人提到过如果一个很大的文件需要放到spooldir目录会发生错误,因为文件一进去就会消费,但是文件又在拷贝过程。后来我改成先把原始文件名添加.COMPLETE,拷贝完成之后,再修改文件名去掉.COMPLETE.

2. 时间冲突

rowkey的规则里有时间,我有一个文件60行数据,消费之后只有48条,因为之前我同过spark 消费也出现过这个问题,因此很容易知道这是因为rowkey冲突了,导致数据覆盖了,因此把源文件的nonce.getAndIncrement()加到ROWKEY即可。

简单说就是循环的过程cal.getTimeInMillis()这个玩意会可能重复,很多人觉得微秒级别不应该出现重复,事实上我碰到过2次,因此现在对通过时间作为rowkey格外小心。



】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇4.Flume三大组件Source、channel.. 下一篇Flume集群安装配置

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目