设为首页 加入收藏

TOP

ELK+kafka集成
2019-04-23 14:31:26 】 浏览:44
Tags:ELK kafka 集成
1、因为本项目采用的log4j2,所以在log4j2中直接配置

<Kafka name="Kafka" topic="XX_log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss}||%p||%c{1}||XX_web||%m%n"/>
<Property name="bootstrap.servers">127.0.0.1:9092</Property>
<Property name="timeout.ms">500</Property>
</Kafka>

PatternLayout 中格式采用了||将内容连接起来目的为了logstash进行切分,其中增加timeout.ms属性为了保证日志系统挂掉的情况不会对业务系统产生较大影响,当然kafka可以采用集群的方式,bootstrap.servers多个地址用“,”分隔。XX_web代表当前业务平台。
2、搭建kafka集群这里就不多介绍了官网很全,

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183


3、创建logstash动态模板

{
"template": "*",
"settings": {
"index.refresh_interval": "5s",
"number_of_replicas": "0",
"number_of_shards": "3"
},
"mappings": {
"_default_": {
"_all": {
"enabled": false
},
"dynamic_templates": [
{
"message_field": {
"match": "message",
"match_mapping_type": "string",
"mapping": {
"type": "string",
"index": "analyzed"
}
}
},
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "string",
"index": "not_analyzed"
}
}
}
],
"properties": {
"dateTime": {
"type": "date",
"format": "yyy-MM-dd HH:mm:ss"
},
"@version": {
"type": "integer",
"index": "not_analyzed"
},
"context": {
"type": "string",
"index": "analyzed"
},
"level": {
"type": "string",
"index": "not_analyzed"
},
"class": {
"type": "string",
"index": "not_analyzed"
},
"server": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}

4、配置logstash

input{
kafka {
zk_connect =>"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
group_id =>"logstash"
topic_id =>"XX_log"
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
filter {
mutate{
split=>["message","||"]
add_field => {
"dateTime" => "%{[message][0]}"
}
add_field => {
"level" => "%{[message][1]}"
}
add_field => {
"class" => "%{[message][2]}"
}
add_field => {
"server" => "%{[message][3]}"
}
add_field => {
"context" => "%{[message][4]}"
}
remove_field => ["message"]
}
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss"]
}
}
output{
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "XX_log-%{+YYYY-MM}"
codec => "json"
manage_template => true
template_overwrite => true
flush_size => 50000
idle_flush_time => 10
workers => 2
template => "E:\logstash\template\template_log.json"
}
}

按照年月将日志保存进ES索引中index => "XX_log-%{+YYYY-MM}",logstash从kafka集群中读取日志信息。

5、搭建ZK集群,这里就不多介绍了,网上资料比较多----http://blog.csdn.net/shirdrn/article/details/7183503

6、搭建ES集群,ES集群比较简单,设置的参数不要太多就可以使用。http://blog.csdn.net/xgjianstart/article/details/52192675

7、配置kibana

server.port: 5601 # 服务端口
# The host to bind the server to.
server.host: "115.28.240.113"
elasticsearch.url: http://127.0.0.1:9200 ES地址-集群
kibana.index: "kibana"


8、版本 JKD 1.7 ES-2.4, logstash 2.4, kafka-2.10,kibana-4.6.4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka - 新消费者 下一篇Spring Cloud系列教程 | 第十一篇..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目