设为首页 加入收藏

TOP

[Hadoop] 大数据项目实操 (未完成)
2018-12-13 12:00:36 】 浏览:40
Tags:Hadoop 数据 项目 完成
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wawa8899/article/details/82845275

1. 背景

公司已有业务系统繁多,日志量大,需要集中管理,预警和监控。

传统的日志方案不满足需求:

  • ELK:适合中小型企业,数据量大hold不住。
  • 目前很多监控服务(如zabbix)还达不到秒级的通知,有时间延迟

外部压力:

  • SLA:服务宕机时间 99.99%,即允许服务宕机时间为0.01%

2. 需求

在线日志收集->分析->可视化&预警,使用大数据平台来实现秒级的预警通知。

  • 对一些即将出现的问题可以提前预知,从而提前预防;及时维护,降低SLA
  • 扩展需求:实时计算数据库级别的日志统计分析,比如MySQL慢查询日志,nginx,tomcat,httpd,linux的系统日志等
  • https://github.com/Hackeruncle/OnlineLogAnalysis

3. 思路分析

过程分析:

  • 基于Flume Exec Source开发自定义插件AdvancedExecSource,拼接成日志格式:

机器名称 服务名称 年月日时分秒.毫秒 日志级别 日志信息

  • Spark Streaming实时计算:统计每台机器的某服务每秒出现的error次数、统计每5秒的warn、error次数等
  • 高危词频(Out of Memory)统计,log4j监控
  • 实时可视化展示
  • 邮件、短信、企业微信好等通知

4. 架构设计

5. 软件版本

  • Flume:flume-ng-1.6.0-cdh5.7.0
  • Hadoop:2.6.0-cdh5.7.0
  • Zookeeper:zookeeper-3.4.5-cdh5.7.0
  • Scala:2.11.12
  • Kafka:0.10(spark streaming版本决定)
  • Spark:2.3.1
  • InfluxDB:1.6.3
  • Grafana:5.2.4
  • JDK:1.8.0_45
  • Maven:3.5.4
  • MySQL:5.7

6. 具体实现

6.1 环境部署

按照架构要求,配合公司已有集群部署好除flume以外的其他所有组件:

  • Hadoop安装并启动(hdfs&yarn)
  • Zookeeper standalone模式安装并启动zkServer
  • Kafka安装并启动kafka-server (下载不动,需要传到云盘)
  • Spark安装
  • InfluxDB安装并启动
  • MySQL安装并启动
  • Grafana安装并启动

6.2 改造flume源码

源码改造:

  • 下载flume-ng-1.6.0-cdh5.7.0源码,为exec source添加两列参数:hostname,servicename
  • 修改日志采集格式为JSON
  • 添加hostname,servicename到JSON
  • 修改日志输出格式为JSON(JSON自带schema)

源码编译:

  • 编译修改的flume源码

flume安装:

  • 上传编译好的flume到服务器上解压安装
[hadoop@hadoop000 software]$ tar -zxvf apache-flume-1.7.0-bin-ruozedata.tar.gz -C ~/app/
[hadoop@hadoop000 software]$ cd ~/app/
[hadoop@hadoop000 app]$ ln -s apache-flume-1.7.0-bin flume
[hadoop@hadoop000 ~]# vi /etc/profile
export FLUME_HOME=/home/hadoop/app/flume
export PATH=$FLUME_HOME/bin:$PATH
[hadoop@hadoop000 ~]# source /etc/profile
[hadoop@hadoop000 ~]$ flume-ng version
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523
[hadoop@hadoop000 ~]$

6.3 运行kafka并创建topic

[hadoop@hadoop000 kafka]$ nohup kafka-server-start.sh config/server.properties &
[hadoop@hadoop000 kafka]$ netstat -nltp | grep 9092
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp6       0      0 192.168.1.8:9092        :::*                    LISTEN      32169/java
[hadoop@hadoop000 kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
[hadoop@hadoop000 kafka]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
onlinelogs
test
[hadoop@hadoop000 kafka]$

6.4 Flume采集日志,写进kafka topic

编写flume agent

[hadoop@hadoop01 flume]$ vi conf/exec_memory_kafka.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the custom exec source
a1.sources.r1.type = com.onlinelog.analysis.ExecSource_JSON
a1.sources.r1.command = tail -F /home/hadoop/data/onlinelog/hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
a1.sources.r1.hostname = hadoop01
a1.sources.r1.servicename = namenode


# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = logs_nn
a1.sinks.k1.kafka.bootstrap.servers = 192.168.1.8:9092
a1.sinks.k1.kafka.flumeBatchSize = 6000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.keep-alive = 90
a1.channels.c1.capacity = 2000000
a1.channels.c1.transactionCapacity = 6000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent,监控日志文件更新并写入kafka对应的topic

[hadoop@hadoop01 flume]$ kill -9 $(pgrep -f flume)
[hadoop@hadoop01 flume]$ nohup bin/flume-ng agent -c conf -f conf/exec_memory_kafka.properties -n a1 -Dflume.root.logger=INFO,console &

检验写flume写入kafka topic成功(“Successfully registered new MBean”)

[hadoop@hadoop01 flume]$ tail -F nohup.out 
	ssl.keymanager.algorithm = SunX509
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	send.buffer.bytes = 131072
	linger.ms = 1

2018-09-30 10:44:18,818 (lifecycleSupervisor-1-0) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:82)] Kafka version : 0.9.0.1
2018-09-30 10:44:18,818 (lifecycleSupervisor-1-0) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83)] Kafka commitId : 23c69d62a0cabf06
2018-09-30 10:44:18,822 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2018-09-30 10:44:18,822 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started

6.5 在InfluxDB里面创建数据库

用于存放异常统计数据

[root@hadoop01 ~]# influx
Connected to http://localhost:8086 version 1.6.3
InfluxDB shell version: 1.6.3
> create database online_log_analysis

6.6 在MySQL里面创建数据库和表

mysql> create database onlineloganalysis;
mysql> use onlineloganalysis;
mysql> create table alertinfo_config
    -> (hostname varchar(10),
    -> lineTimestamp timestamp,
    -> logInfo text,
    -> logType varchar(10),
    -> serviceName varchar(10));

6.7Spark Straming消费kafka topic日志,写进InfluxDB

checkpoint

//checkpoint
//hdfs dfs -mkdir hdfs://10.132.37.38:9000/tmp/spark
//hdfs dfs -mkdir hdfs://10.132.37.38:9000tmp/spark/checkpointdata
//hdfs dfs -chmod -R 777 hdfs://10.132.37.38:9000/tmp/spark
//hdfs dfs -ls hdfs://10.132.37.38:9000/tmp/spark
jssc.checkpoint("hdfs://10.132.37.38:9000/tmp/spark/checkpointdata");

执行spark streaming脚本

模拟产生日志

[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ cat hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out.bak >> hadoop-cmf-hdfs-NAMENODE-hadoop01.log.out
[hadoop@hadoop01 onlinelog]$ 

去InfluxDB查看,已经有日志进来

> select * from logtype_count
name: logtype_count
time                count host_service_logtype
----                ----- --------------------
1538292004042053856 6     hadoop01_namenode_INFO
1538292082207099118 58    hadoop01_namenode_WARN
1538292082207099118 51533 hadoop01_namenode_INFO
1538292082207099118 3     hadoop01_namenode_ERROR
1538292088170705293 83425 hadoop01_namenode_INFO
1538292088170705293 6     hadoop01_namenode_ERROR
1538292088170705293 116   hadoop01_namenode_WARN
1538292092902873984 3     hadoop01_namenode_ERROR
1538292092902873984 58    hadoop01_namenode_WARN
1538292092902873984 87892 hadoop01_namenode_INFO
1538292097956130442 116   hadoop01_namenode_WARN
1538292097956130442 88505 hadoop01_namenode_INFO
1538292097956130442 6     hadoop01_namenode_ERROR
1538292103218355847 88879 hadoop01_namenode_INFO
1538292103218355847 3     hadoop01_namenode_ERROR
1538292103218355847 58    hadoop01_namenode_WARN
1538292108094857436 6     hadoop01_namenode_ERROR
1538292108094857436 116   hadoop01_namenode_WARN
1538292108094857436 88811 hadoop01_namenode_INFO
1538292113515462193 116   hadoop01_namenode_WARN
1538292113515462193 88565 hadoop01_namenode_INFO
1538292113515462193 6     hadoop01_namenode_ERROR
1538292126723912190 88879 hadoop01_namenode_INFO
1538292126723912190 3     hadoop01_namenode_ERROR
1538292126723912190 58    hadoop01_namenode_WARN
1538292133790960057 6     hadoop01_namenode_ERROR
1538292133790960057 116   hadoop01_namenode_WARN
1538292133790960057 89098 hadoop01_namenode_INFO
1538292144645545997 58    hadoop01_namenode_WARN
1538292144645545997 89059 hadoop01_namenode_INFO
1538292144645545997 3     hadoop01_namenode_ERROR
> 

6.8 使用Grafana实现可视化

在Grafana中导入InfluxDB作为数据源,配置查询语句,生成dashboard。

==================================BACKUPS========================================

6.3 开发flume-ng自定义插件

1) IDEA下载flume源代码,参照exec source源码部分添加两列参数:hostname和servicename

2) 修改日志输出格式为JSON

注意事项:

  • 选择JSON格式是因为JSON自带schema(搜索log4j support JSON,修改log4j配置参数即可)
  • info级别的日志是一行一行的,所以不会有问题。warn/error级别的日志要处理换行、特殊符号等问题

4. 改造flume代码

4.1 下载flume源码

wget http://mirrors.hust.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-src.tar.gz

下载完解压,并且重命名成

4.2 参照exec source代码,添加两列参数:hostname和servicename

4.3 采集的是JSON数据(JSON自带schema)

如何保证采集的数据是JSON格式

CDH log4j(搜log4j support JSON)修改log4j配置参数,输出的日志格式为JSON

// TODO START

a. 日志折断/截断?(换行符、特殊字符)

b.hostname和servicename添加到JSON格式组装

c. 输出之前 JSON格式验证,JSON格式OK的真正输出,JSON格式不OK的打回log4j日志(输出log)再监控,然后修正迭代这块的代码,让不OK日志越来越少

// END

如何保证输出的数据是JSON格式

4.4 输出的也必须是JSON数据

4.5 日志级别

info warn error

tail -F /var/log/aaa.log 日志一行一行读

info级别的日志是一行一行的,所以不会有问题。warn/error级别的日志要处理换行、特殊符号等问题

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

5.3 flume日志收集,汇聚到kafka(若后期还要做离线计算,可以汇聚到hdfs)

5.4 Spark streaming读取kafka日志,写入influxdb

5.5 grafana可视化展示

5.6 邮件集成

时序数据库http://docs.influxdata.com/influxdb/v1.6/introduction/installation/

可视化:https://grafana.com/

grafana cassandra

cassandra 运维监控:https://grafana.com/dashboards/371

https://community.grafana.com/t/cassandra-datasource-for-grafana/2729

cassandra之上的时序数据库:https://mannekentech.com/2017/01/28/playing-with-kairos-db/

之后再用grafana做可视化

日志级别

日志详细信息自定义预警关键词

项目总结:

1.博客是V1版本

2.两天的课程是V2版本

3.梳理需求+集群部署视频(稍晚提供)

4.第一座山:项目复现,流程打通 flume采集(使用现成的jar包,不要去改源代码)-> kafka -> influxdb ->可视化

5.第二座山:将工程代码java改造为scala

看J哥github博客:项目第二阶段概述 1~7点

第一座山:截图

第二座山:提交代码到gitlab java代码改造scala只改造主工程代码:

改造:

1. 主类

2. 广播变量

不改造:influxdb

最后java调用scala代码

可以在原来的工程里面直接新建一个package来做

也可以新建一个工程,把influxdb拷贝过去

坑:对象无法序列化(百度)+ 打包跑到集群上面on yarn

kebana?

graphite

http://opentsdb.net/docs/build/html/user_guide/backends/cassandra.html

================================================================================

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hadoop fs   常用的shell命.. 下一篇第一个hadoop练习

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目