设为首页 加入收藏

TOP

flume1.5 kafka0.8 logstash2.4 es5.6 错误日志聚集解决方案
2018-12-25 14:09:31 】 浏览:184
Tags:flume1.5 kafka0.8 logstash2.4 es5.6 错误 日志 聚集 解决方案

错误日志聚集解决方案

简述:

目的:对分散在各集群上的错误日志信息进行实时采集聚集,

由flume采集给kafka, 之后消费到logstash进行日志解析,最终将解析的json格式数据交由elasticsearch,利用kinbana进行查询,图表展示。

目前版本为 flume 1.5 kafka0.8.1.1

原本存在的问题:flume采集时如果文件发生了变化无法判断是否为原文件,因而无法进行采集。

解决方案:

利用Flume 1.7+提供的 Source taildir 可根据unix系统下文件唯一标识读取文件。将taildir作为自定义source使用即可。

采集后的日志文件如果存储在hbase中,如果需要多纬度查询,则开发较复杂。

解决方案:

利用ES与kinbana结合可以轻松展示采集到的日志信息,并且可以利用解析的数据进行多维度分析,并且可以简便地生成图表。

其他:

针对Error日志java 异常抛出时会产生多行日志,需要采集的时候合并行,因而重新自定义了 source 的readEvents方法。(有已存在的解决方案http://www.androidev.com/2017/12/03/中国民生银行大数据团队的flume实践-2017-12-02/)

根据现状选择的版本:

Flume 1.5 kafka 0.8.1.1 logstash 2.4 es 5.6

效果达成:

hadoop 的错误日志 统计


在flume的配置文件中



Flume配置解释

图为自定义的multiline配置

自定义了在传给kafka之前为日志添加前缀,hostname 及 frame 用于增添维度。

agent.sinks.kafkaSink.split 可以设置分隔符默认 @,

logstash配置:

Input:kafka topic设置

filter部分解释:

用于解析日志生成json 给es 并去除多余信息。

示例:

dev-hadoop6@,hadoop@,2018-02-0614:53:58,198 ERROR [http-nio-8080-exec-1] db.KafKaInfo - kafka getGroupInfofailed

https://grokdebug.herokuapp.com/可以进行grok的测试

(<hostname>[a-zA-Z0-9-]+)@,(<frame>[a-zA-Z0-9-]+)@,(<time>%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY:day}\s%{HOUR:hour}:%{MINUTE:minutes}:%{SECOND:seconds},\d{3})\s%{LOGLEVEL:level}(<info>.*)

红色部分为解析后的json内容其余部分由remove_field抛弃(默认传递过来的k,v是message,日志信息 如果丢弃message则相当于丢弃了原本的日志,而解析的部分将传给ES。)

基本规则:

(<key的值>匹配该值的正则)

例如dev-hadoop6

(<hostname>[a-zA-Z0-9-]+) 会将他转换为

{

"hostname": [

[

"dev-hadoop6"

]

]

}

Logstash中有其自带的pattern 使用方法即 %{HOUR:键的名称}

HOUR在源码中对应 (logstash 2.4)

ventor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns

./grok-patterns:

HOUR (:2[0123]|[01][0-9])

可仿照该方式自定义,只要放在该目录下即可。

内部实现为ruby的正则。

Output: ElasticSearch设置

Stdout输出到控制台 用于测试观察

注:如果条件允许可以使用filebeat替代flume 更加方便(flume源码是javafilebeatruby)。

执行命令语句:

启动logstash

nohup bin/logstash -f normalLog.conf>/dev/null 2>&1 &

bin/flume-ngagent -n agent --conf conf --conf-file conf/hadoopLog.conf -Dflume.root.logger=INFO,console

10.100.3.174:~/cluster/flume-release

nohup bin/flume-ng agent -n agent --confconf --conf-file conf/esLog.conf -Dflume.monitoring.type=http-Dflume.monitoring.port=5658 >/dev/null 2>&1 &

nohup bin/flume-ng agent -n agent --confconf --conf-file conf/hadoopLog.conf -Dflume.monitoring.type=http-Dflume.monitoring.port=5655 >/dev/null 2>&1 &

nohup bin/flume-ng agent -n agent --confconf --conf-file conf/kafkaLog.conf -Dflume.monitoring.type=http-Dflume.monitoring.port=5657 >/dev/null 2>&1 &

nohup bin/flume-ng agent -n agent --confconf --conf-file conf/hbaseLog.conf -Dflume.monitoring.type=http-Dflume.monitoring.port=5656 >test/flumelogs/hbase 2>&1 &

nohup bin/flume-ng agent -n agent --confconf --conf-file conf/flumeLog.conf -Dflume.monitoring.type=http-Dflume.monitoring.port=5659 >/dev/null 2>&1 &

需要先创建该topic

bin/kafka-topics.sh--create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topicfilebeat

./kafka-console-consumer.sh -zookeeper10.100.3.175:2181 --from-beginning --topic filebeat

需要根据不同框架的日志自定义采集方式,存储的frame,及解析方式。进行配置文件的改写。

正则优化:

可以匹配的日志格式:

除去自定义的header 日期格式无论被[] 包裹与否,以T或者空格分隔日与小时与否,日志等级level 是否被[]包裹与否皆可匹配。

一:

dev-hadoop6@,elasticsearch@,[2018-02-09T12:45:11,489][WARN][o.e.t.n.Netty4Transport ] [node-3]exception caught on transport layer [[id: 0xf8399a18, L:/10.100.3.176:9300 !R:/10.100.51.28:58432]], closing connection

io.netty.handler.codec.DecoderException:java.io.StreamCorruptedException: invalid data length: 0

atio.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459)~[netty-codec-4.1.13.Final.jar:4.1.13.Final]

二:

dev-hadoop6@,hadoop@,2018-02-0614:53:58,198 ERROR [http-nio-8080-exec-1] db.KafKaInfo - kafka getGroupInfofailed

三:

[2018-02-09 14:42:52,976] ERROR Closingsocket for /10.100.3.176 because of error (kafka.network.Processor)

java.io.IOException: 断开的管道

at sun.nio.ch.FileDispatcherImpl.write0(Native Method)

at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)

at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)

at sun.nio.ch.IOUtil.write(IOUtil.java:65)

at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)

at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:122)

at kafka.network.MultiSend.writeTo(Transmission.scala:102)

(<hostname>[a-zA-Z0-9-]+)@,(<frame>[a-zA-Z0-9-]+)@,((\[)|([\s]*))(<time>%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY:day}(([T]{1})|([ ]{1}))%{HOUR:hour}:%{MINUTE:minutes}:%{SECOND:seconds},\d{3})((\][\s]*)|([\s]*))((\[)|([\s]*))%{LOGLEVEL:level}[\s]*((\][\s]*)|([\s]*))(<info>.*)

对log4j2中出现的[ ]进行了判断。


遇到过的问题:

Flume 长时间启动后可能产生oom 具体缘由未发现,症状为linux中进程还在,但是以及停止了日志采集工作。调大了flume启动的jvm内存后暂时未发现状况。(jdk1.8后永生代由jvm自己控制,应该不是代码的原因)在flume的日志中发现了channel沾满的记录,可能因此导致内存溢出?扩大了channel容量。

最新的配置文件截图:

Flume日志的收集。


Logstash 按月建立索引 给es


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume读取日志数据写入kafka &nbs.. 下一篇flume读取binlog与kafka整合

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目