设为首页 加入收藏

TOP

elasticsearch+logstash+kafka+beat搭建日志审计体系
2019-05-11 02:37:25 】 浏览:66
Tags:elasticsearch logstash kafka beat 搭建 日志 审计 体系

序言:本文搭建日志审计系统,8g内存+1T硬盘,单机实现每秒3000eps的吞吐量,主要架构如下:

客户端日志监听工具:
    evtsys:监听主机系统,网络设备的日志
    filebeat:监听应用日志
日志服务器接收日志:
    logstash:监听客户机发送的日志,做简单的处理,规整日志
日志服务器存储日志:
    elasticsearch:存储日志,类似于nosql数据库,按天建立索引
日志缓存队列:
    根据日志吞吐量进行选择:首选kafka,activeMQ、rabbitMQ次之,使用ZeroMQ技术成本偏高  

整体框架如下:

一 logstash安装配置

logstash依赖JaVA环境,首先安装jdk,略下载logstash:进入es官网下载最新版本logstash。
elasticsearch官网:https://www.elastic.co/cn/downloads 
(1)下载logstash压缩文件:logstash-5.4.1.tar.gz
(2)赋权:chmod +x logstash-5.4.1.tar.gz
(3)解压缩:tar -zxvf logstash-5.4.1.tar.gz
(4)在config目录下创建:test.conf文件
(5)修改test.conf配置文件,如下:
input {
  # 接收syslog日志
  syslog {
   port => "514"
   type => "syslog"
   codec=>plain{charset=>"ISO-8859-1"}
  }
  # 接收snmptrap日志
  snmptrap {
   port => "162"
   type => "snmpTrap"
  }
  #接收filebeat的日志
  beats {
  port => "5044"
  type => "app"
  }
}

filter {
  #去掉换行符
  mutate{
   gsub => [ "message", "\r", "" ]
   gsub => [ "message", "\n", "" ]
  }
  # 由于应用日志是非标准syslog日志,故针对filebeat的应用日志type(例:tomcat,ip)格式做处理
  if [type] =~ /(,)/ {
    mutate {
     split => ["type",","]
     # 新增字段,原字段转换
     add_field => {
       "hostname" => "%{[host]}"
       "appType" => "%{[type][0]}"
       "isApp" => 1
       "timestamp" => "yyyy-MM-dd HH:mm:ss"
     }
     # 删除不规则字段
     remove_field => [
       "tags",
       "offset",
       "host"
     ] 
     # 转换字段的类型
     convert => { "isApp" => "integer" }
    }  
    mutate {
      add_field => {
       "host" => "%{[type][1]}"
      } 
      remove_field => [
       "type",
       "beat"
      ] 
    }
  }else{ #非应用日志的处理
    mutate {
      rename => [ "logsource", "hostname" ]
    }
    mutate { 
     add_field => {
       "isApp" => 0
     }
     remove_field => [
       "tags",
       "priority"
     ]
     convert => {
      "isApp" => "integer"
     }
    }
  }
  # 规整时间,加8个小时
  ruby {
   code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
  }
  mutate {
     remove_field => [
       "@version",
       "tags"
     ]
  }
}
output {
   # 使用kafka作为队列
   kafka{
     topic_id => "loginfoTopic"  #kafka处理接收日志的topic
     # kafka的地址
     bootstrap_servers => "localhost:9092" 
     # 一定要注明输出格式
     codec => "json"
   }
}   
(6)启动logstash
创建日志路径并制定日志目录
nohup /usr/local/wutongyu/tools/logstash/bin/logstash -f /usr/local/wutongyu/tools/logstash/config/test.conf 1>>/opt/log/siem/logstash.log 2>>/opt/log/siem/logstash.log &
二 elasticsearch 安装配置

详见:https://blog.csdn.net/wutongyuWxc/article/details/78888190

三 kafka 安装配置

1.下载最新版kafka 
    Apache官网: http://kafka.apache.org/downloads.html
2.解压kafka
    tar -zxvf kafka_2.12-1.0.0.tgz
3.重命名解压目录名  
    mv kafka_2.12-1.0.0 kafka
4.启动
    进入解压目录:cd kafka
  修改config/zookeeper.properties的日志路径
    启动zookeeper:bin/zookeeper-server-start.sh config/zookeeper.properties &
  修改config/server.properties的日志路径
  启动kafka:bin/kafka-server-start.sh config/server.properties &

kafka结合spring的使用细则详见:http://blog.csdn.net/wutongyuwxc/article/details/79148011

四 客户端日志监听配置

evtsys配置

evtsys用于监听Windows、Linux、交换机、防火墙等系统日志,通过514端口将日志发送到logstash
(1)下载最新版 evtsys;
(2)将evtsys.exe文件拷贝到C:\Windows\System32
(3)以管理员身份打开命令窗口执行如下命令:
    net evtsys stop
    evtsys -i ip:端口
    net evtsys start

filebeat配置

    filebeat用于监听应用系统产生的日志,通过5044端口将日志发送到logstash,根据主机系统的类别和位数下载对应版本filebeat。
(1)进入es官网:https://www.elastic.co/cn/products/beats 下载 filebeat
(2)解压文件:tar -zxvf filebeat-5.4.1-linux-x86_64.tar.gz
(3)修改配置文件filebeat.yml
filebeat.prospectors:
 -
  input_type: log
  paths:
    - /opt/log/imp/localhost_access_log*.txt #tomcat访问日志路径
  exclude_lines: ["(200 )"]  #过滤正常请求
  close_older: 5h #连续5h没有日志产生,就不监听该日志文件
  scan_frequency: 10s #每10s检查一次是否有新日志产生
  document_type: "tomcat,192.168.211.68"

 - 
  input_type: log
  paths:
    - /opt/log/imp/catalina*.log #tomcat错误日志
  exclude_lines: ["^DBG"]  #过滤debug日志
  close_older: 5h
  scan_frequency: 10s
  document_type: "tomcat,192.168.211.68"

 -
  input_type: log
  paths:
    - /dom/loglink/mysql/mysqld.log #mysql错误日志
  exclude_lines: ["([Note])"]  
  scan_frequency: 60s
  document_type: "mysql-error,192.168.211.68"
# 日志通过514端口输出到logstash
output.logstash:
 hosts: ["192.168.211.50:5044"]

(4)启动:
    ./filebeat -e -c filebeat.yml -d "Publish"
或者:
    nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇排序算法练习 下一篇Dockerfile命令说明

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目