版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/xuguokun1986/article/details/49452101
本片博客的场景:具有一定格式的数据首先被推送到Kafka活着是redis(本实验选择的是Kafka),当数据从Kafka出来之后,被送到Logstash进行简单的处理,然后数据从Logstash出来再被存储进Elasticsearch中。
首先需要说明的一点是,开始用的是Elasticsearch-1.7.1、Logstash-1.5.4、JDK-1.7.0_79,在将es配置成为Logstash-1.5.4的output之后,运行logstash,提示JDK-1.7.0_79已经不支持这套环境,所以必须苦逼的更换jdk,换成了JDK-1.8.0_65。这是一个大坑,所以请后来的朋友们一定要注意。
本次的整体环境依然是我在以往的博客中的环境:三台机器(10.10.16.252、10.10.16.253、10.10.16.249)。这三台机器中分别安装了Kafka、Elasticsearch-1.7.1、Logstash-1.5.4、JDK-1.8.0_65(其他在本次试验中不适用就没有提到)。
1.创建一个topic,其名字是logStash,然后对/home/hadoop1/bms/Logstash-1.5.4/conf目录下的kafkaInput_esOutPut1.conf进行配置。配置的内容如下:
input {
kafka {
zk_connect => "10.10.16.252:2181,10.10.16.253:2181,10.10.16.249:2181"
group_id => "test-consumer-group"
topic_id => "logStash"
reset_beginning => false # boolean (optional), default: false
consumer_threads => 5 # number (optional), default: 1
decorate_events => true # boolean (optional), default: false
}
}
filter{
mutate{
#以:号分割message内容,分割后以数据方式显示。
#比如abc:efg => message[0] = abc message[1]=efg
split => ["message",","]
}
#第一个数据的内容中ORA-xxxxx这种格式,则这条内容是ora错误。添加二个字段
mutate{
add_field => {
"source_Ip" => "%{[message][0]}"
"source_Port" => "%{[message][1]}"
"dest_Ip" => "%{[message][2]}"
"dest_Port" => "%{[message][3]}"
}
}
}
output {
elasticsearch {
host => "localhost"
}
}
2.然后在三台机器上分别启动logstash。
./logstash agent -f ../conf/kafkaInput_esOutPut1.conf
3.然后启动kafka的producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic logStash
4.然后查看数据是不是已经推送到es
curl -XGET 'localhost:9200/logstash-2015.10.27/_search'
注意:logstash-2015.10.27是index的名字,也就是在output中默认配置项。该项操作是在哪天做的,生成的index的名字就是logstash-“当前日期“,比如我实验的日期是:2015.10.17,那么生成的index的名字就是logstash-2015.10.27,这样来管理index的一个好处是可以将数据按天进行整理,当需要删除数据的时候就可以根据该规则去匹配。