设为首页 加入收藏

TOP

解决flume运行中的一个异常问题!
2019-05-12 14:11:51 】 浏览:131
Tags:解决 flume 运行 一个 异常 问题

今天在本地测试flume的exec 监控文件 分割的问题!!!遇到各种141异常问题!

怀疑是在切割文件的时候超过了监控文本的时间,导致flume异常退出,,,所以增加了keep-alive 时长,,,他的默认值是3秒,,我把它设置为30秒,,,之后运行,,,,他不再异常!!!

解决:设置agent1.channels.<channel_name>.keep-alive = 30

参考文章:问题2,,,,虽然前边的agent,方式可能不一样,但是这个关键的时间是一样的。

-------------------------------------------------以下是原文,原文地址:http://www.tuicool.com/articles/mmm2AvF

flume 问题分析与处理

问题一:

org.apache.flume.EventDeliveryException:Failed to send events

atorg.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:382)

atorg.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

at java.lang.Thread.run(Thread.java:722)

Caused by:org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host:10.95.198.123, port: 44444 }: Failed to send batch

at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:294)

atorg.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:366)

... 3 more

Caused by:org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host:10.95.198.123, port: 44444 }: Avro RPC call returned Status: FAILED

atorg.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:370)

分析:

代码分析

try {
appendBatch(events, requestTimeout, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
// we mark as no longer active without trying to clean up resources
// client is required to call close() to clean up resources
setState(ConnState.DEAD);
if (t instanceof Error) {
throw (Error) t;
}
if (t instanceof TimeoutException) {
throw new EventDeliveryException(this + ": Failed to send event. " +
"RPC request timed out after " + requestTimeout + " ms", t);
}
throw new EventDeliveryException(this + ": Failed to send batch", t);

请求超时,导致发送event失败

解决:

设置request-timeout长一点,默认20秒

问题二:

org.apache.flume.ChannelException: Unableto put batch on required channel: org.apache.flume.channel.MemoryChannel{name:woStoreSoftWDownloadC2}

atorg.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)

atorg.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:376)

atorg.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:336)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)

at java.util.concurrent.FutureTask.run(FutureTask.java:166)

atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

at java.lang.Thread.run(Thread.java:722)

Caused by:org.apache.flume.ChannelException: Space for commit to queue couldn't beacquired Sinks are likely not keeping up with sources, or the buffer size istoo tight

atorg.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)

atorg.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)

atorg.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)

... 8 more

30 Mar 2014 10:16:00,960 ERROR[timedFlushExecService18-0](org.apache.flume.source.ExecSource$ExecRunnable$1.run:322) - Exception occured when processing eventbatch

org.apache.flume.ChannelException: Unableto put batch on required channel: org.apache.flume.channel.MemoryChannel{name:woStoreSoftWDownloadC2}

atorg.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)

atorg.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:376)

atorg.apache.flume.source.ExecSource$ExecRunnable.access$100(ExecSource.java:249)

at org.apache.flume.source.ExecSource$ExecRunnable$1.run(ExecSource.java:318)

atjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

atjava.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)

at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)

at

代码分析:

protected void doCommit() throws InterruptedException {
int remainingChange = takeList.size() - putList.size();
if(remainingChange < 0) {
if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
throw new ChannelException("Space for commit to queue couldn't be acquired" +
" Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
}
int puts = putList.size();
int takes = takeList.size();
synchronized(queueLock) {
if(puts > 0 ) {
while(!putList.isEmpty()) {
if(!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
putList.clear();
takeList.clear();
}
queueStored.release(puts);
if(remainingChange > 0) {
queueRemaining.release(remainingChange);
}
if (puts > 0) {
channelCounter.addToEventPutSuccessCount(puts);
}
if (takes > 0) {
channelCounter.addToEventTakeSuccessCount(takes);
}
channelCounter.setChannelSize(queue.size());
}

等待keep-alive之后,还是没办法插入event

解决方案:

设置keep-alive(默认3秒),capacity(100),transactionCapacity(100)大一点

-----------------------------------------------------------------------------------------------

之后,前边的异常少了,,后边的异常是,,,产生消息快,命令消费慢!!!导致管道满!!!!

这个就需要对端加大管道的消费速度,来调整???,,,,,怀疑之,,,然后增大空间和传输空间!

-----------更改结果如下!

stage_nginx.channels.M1.capacity = 1000
stage_nginx.channels.M1.transactionCapacity = 1000
stage_nginx.channels.M1.keep-alive = 30

---------

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇【Flume】文件收集框架Flume 下一篇Java内容的复习-大数据

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目