版权声明: https://blog.csdn.net/hejiangtju/article/details/80366832
1. 问题现象
在使用 Flume 将数据从 Kafka 加载到 hive 的过程中,我们遇到一个问题:每天晚上 Flume 的 Hive Sink 总会报错,然后停止工作:
15 juil. 2016 21:40:43,008 INFO [hive-hive1-call-runner-0] (org.apache.flume.sink.hive.HiveWriter$2.call:238) - Sending heartbeat on batch TxnIds= [3755...3764] on endPoint = {metaStoreUri=... 15 juil. 2016 22:12:21,001 INFO [hive-hive1-call-runner-0] (org.apache.flume.sink.hive.HiveWriter$2.call:231) - Sending heartbeat on batch TxnIds= [3785...3794] on endPoint = {metaStoreUri=...
15 juil. 2016 22:27:56,963 INFO [hive-hive1-call-runner-0] (org.apache.flume.sink.hive.HiveWriter$2.call:231) - Sending heartbeat on batch TxnIds= [3795...3804] on endPoint = {metaStoreUri=...
这个问题的神奇之处在于,我们第二天重启 Flume,又能一直正常工作,直到第二天晚上...
2. 问题分析
从搜索中,我们发现其他人也遇到了同样的问题:https://issues.apache.org/jira/browse/FLUME-2956 ,而且目前还无解,看来要找现成的解决方案不太可能了,只能从源码入手。
在 Flume 的 Hive Sink 源码中,我们看到如下问题:
a. 异常捕获不充分(HiveSink.java :321);另外,在异常处理中还可能抛出异常()
这里只捕获了HiveWriter.Failure,而实际情况中抛出的异常是从 RuntimeException 继承来的,也就直接抛出去了;所以后面的补救措施:
} catch (HiveWriter.Failure e) { // in case of error we close all TxnBatches to start clean next time LOG.warn(getName() + " : " + e.getMessage(), e); abortAllWriters(); closeAllWriters(); throw e;
}
没有得到执行。
另外由于 Hive Stream API 为了简化代码,将异常从 RuntimeException 继承来,导致这里的开发人员忘了捕获abortAllWriters() 和closeAllWriters() 中可能抛出的异常。
b. 发送 heartbeat 的工作在 HiveWriter.flush (HiveWriter.java :190) 中进行,但是这个函数只在刚刚发送数据的 activeWriters 上调用
这个有问题。因为 hive writer 是重复使用的,如果很长一段时间后再来数据,那么在这段中断的时间内,服务器(metastore) 就会关掉这个连接。然后客户端 (hive Sink) 再次发送 heartbeat 就会发生我们看到的现象。
3. 问题处理
从以上分析可以看到,我们需要进行的修补工作包括:
a. 在 (HiveSink.java:321) 中 捕获其他异常, 并 捕获 abortAllWriters() 和 closeAllWriters() 中可能抛出的异常,保证充分丢弃并关闭所有 HiveWriter
b. 在空闲的 HiveWriter 上发送 heartbeat;并且及时清理空闲的 HiveWriter。
详细修改请参考https://github.com/apache/flume/pull/206 。
经测试,以上处理完全修正 Flume Hive Sink 一段时间无数据导致的停止工作问题。