设为首页 加入收藏

TOP

基于flume的日志收集系统配置
2018-12-12 02:01:58 】 浏览:71
Tags:基于 flume 日志 收集 系统 配置

大数据系统中通常需要采集的日志有:

  1. 系统访问日志
  2. 用户点击日志
  3. 其他业务日志(比如推荐系统的点击日志)

在收集日志的时候,一般分为三层结构:采集层、汇总层和存储层,而不是直接从采集端将数据发送到存储端,这样的好处有:

  1. 如果存储端如Hadoop集群、Kafka等需要停机维护或升级,对部署在应用服务器上的采集端没有影响,只需要汇总层做好数据的缓冲,在存储端恢复正常后继续写入数据。
  2. 采集层只负责数据的采集,由汇总层统一维护数据的路由逻辑(比如发送到hdfs还是kafka?),由于采集端所在的应用服务器一般数量较多,且会随着业务的扩展而不断增加,这种方式可以降低日志采集配置的维护成本,降低大数据应用对业务系统的影响

基于三层结构的flume日志采集系统架构一般如下图所示:

说明:

对于采集层agent,一般要求尽快将日志发送出去,避免在采集层堆积数据,所以使用memory的channel,sink统一使用avro;对于汇总层agent,要求可以尽量保证数据的缓冲,所以使用file channel,并且尽量调大容量,对于要求实时处理的数据,可以使用SSD的磁盘以提高处理速度,source统一使用avro。

各agent的配置如下:

【Agent-1】:位于采集层,用于收集应用A产生的日志,这些日志需要保存到HDFS中用于离线分析,同时也需要发送给Kafka用于实时的计算(如用户点击日志)。

a1.sources = src_1

a1.channels = ch_m_1

a1.sinks = sink_1 sink_2

# 配置 source:从指定目录读取日志数据

a1.sources.src_1.type = spooldir

a1.sources.src_1.channels = ch_m_1

a1.sources.src_1.spoolDir = /data/nginx/log/user_click/

a1.sources.src_1.includePattern=^.*$

# 日志数据一般是按照size或者时间切换,应用正在写入的文件不能读取,否则flume会报错,所以需要把这个文件排除掉。比如正在写入的是user.click.log,切换后的是user.click.log.yyyy.mm.dd.hh,则需要把user.click.log文件排除掉:

a1.sources.src_1.ignorePattern=^.*log$

# 配置 channel

a1.channels.ch_m_1.type = memory

# channel中可以缓存的event数量的最大值。可以根据单个event所占空间和可用的内存来评估可以缓存的event的最大数量

a1.channels.ch_m_1.capacity = 100000

# 一个事务中可批量接收或发送的event数量的最大值

a1.channels.ch_m_1.transactionCapacity = 5000

# 配置 sinks:多个sink节点,用于负载均衡

a1.sinks.sink_1.channel = ch_m_1

a1.sinks.sink_1.type = avro

# 汇总层节点地址

a1.sinks.sink_1.hostname = 192.168.1.110

# 汇总层节点avro监听端口

a1.sinks.sink_1.port = 44446

a1.sinks.sink_2.channel = ch_m_1

a1.sinks.sink_2.type = avro

# 汇总层节点地址

a1.sinks.sink_2.hostname = 192.168.1.111

# 汇总层节点avro监听端口

a1.sinks.sink_2.port = 44446

# sink端的负载均衡配置

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = sink_1 sink_2

a1.sinkgroups.g1.processor.type = load_balance

# 使用轮询的方式选择sink

a1.sinkgroups.g1.processor.selector = round_robin

【Agent-2】:位于采集层,用于收集应用B产生的日志,这些日志只需要发送给Kafka用于实时的计算。

a2.sources = src_1

a2.channels = ch_m_1

a2.sinks = sink_1 sink_2

# 配置 source:从指定目录读取日志数据

a2.sources.src_1.type = spooldir

a2.sources.src_1.channels = ch_m_1

a2.sources.src_1.type = exec

# 使用tail -f 命令监听日志的修改,可以近实时的采集到需要的日志数据

a2.sources.src_1.command = tail -F /data/nginx/log/user_action/action.log

# 配置 channel

a2.channels.ch_m_1.type = memory

# channel中可以缓存的event数量的最大值。可以根据单个event所占空间和可用的内存来评估可以缓存的event的最大数量

a2.channels.ch_m_1.capacity = 100000

# 一个事务中可批量接收或发送的event数量的最大值

a2.channels.ch_m_1.transactionCapacity = 5000

# 配置 sinks:多个sink节点,用于负载均衡

a2.sinks.sink_1.channel = ch_m_1

a2.sinks.sink_1.type = avro

# 汇总层节点地址

a2.sinks.sink_1.hostname = 192.168.1.110

# 汇总层节点avro监听端口,由于处理逻辑不一样,所以此处汇总层的avro端口与【Agent-1】不同

a2.sinks.sink_1.port = 44447

a2.sinks.sink_2.channel = ch_m_1

a2.sinks.sink_2.type = avro

# 汇总层节点地址

a2.sinks.sink_2.hostname = 192.168.1.111

# 汇总层节点avro监听端口

a2.sinks.sink_2.port = 44447

# sink端的负载均衡配置

a2.sinkgroups = g1

a2.sinkgroups.g1.sinks = sink_1 sink_2

a2.sinkgroups.g1.processor.type = load_balance

# 使用轮询的方式选择sink

a2.sinkgroups.g1.processor.selector = round_robin

【Agent-3】:位于汇总层,用于处理【Agent-1】发送过来的数据,由于需要同时将数据发送给HDFS和Kafka,所以配置了两个channel。

a3.sources = src_1

a3.channels = ch_f_1 ch_f_2

a3.sinks = sink_1 sink_2

# 配置 source: 用于处理【agent-1】发过来的数据,将数据分别发送到hdfs和kafka,所以需要配置两个channel

a3.sources.src_1.channels = ch_f_1 ch_f_2

a3.sources.src_1.type = avro

a3.sources.src_1.bind = 192.168.1.110

a3.sources.src_1.port = 44446

# 调整处理的线程数,提高处理能力

a3.sources.src_1.threads = 8

# 配置channel

# ch_f_1:对应hdfs sink

a3.channels.ch_f_1.type = file

# 文件检查点保存路径。File Channel在接收到source的数据后马上写入磁盘文件中,然后通过一个内存队列来保存已被Source写入但还未被Sink消费的Event的指针(Event指针指的是Event在磁盘上的数据文件中的存放位置)。检查点指的是channel每隔一段时间(checkpointInterval)将内存队列的“快照”持久化到磁盘文件以避免agent重启后内存队列信息的丢失。为了保证“快照”的完整性,在将内存队列持久到磁盘文件时需要锁定channel,就是说此过程Source不能写Channel,Sink也不能读Channel。

a3.channels.ch_f_1.checkpointDir = /data/flume/channels/user_click_event/checkpoint

# 是否需要备份检查点。为保证数据的可靠性,一般设置为true

a3.channels.ch_f_1.useDualCheckpoints = true

# 备份检查点保存路径,与checkpointDir使用不一样的路径

a3.channels.ch_f_1.backupCheckpointDir = /data/flume/channels/user_click_event/backup

# 检查点执行间隔,以分钟为单位

a3.channels.ch_f_1.checkpointInterval = 60000

# 数据保存路径

a3.channels.ch_f_1.dataDirs = /data/flume/channels/user_click_event/data

# 未提交的事务数量的最大值,通过调节此数字可以增加channel的吞吐量

a3.channels.ch_f_1.transactionCapacity = 100000

# 可以缓存的event数量的最大值

a3.channels.ch_f_1.capacity = 500000

# 一个存放操作的等待时间值,以秒为单位。默认为3秒,在数据量较大的情况下可以适当调大

a3.channels.ch_f_1.keep-alive = 5

# 单个文件的最大size,以byte为单位,默认为2G。file channel会在单个文件达到最大值后新创建一个文件来保存数据,size过小会导致在数据量大的情况下频繁的创建文件,size过大会则会降低文件读写的效率

a3.channels.ch_f_1.maxFileSize = 5368709120

# ch_f_2:对应kafka sink,使用的也是file channel,配置与上面类似,需要注意的是使用不同的存储路径

a3.channels.ch_f_2.type = file

a3.channels.ch_f_2.checkpointDir = /data2/flume/channels/user_action_event/checkpoint

a3.channels.ch_f_2.useDualCheckpoints = true

a3.channels.ch_f_2.backupCheckpointDir = /data2/flume/channels/user_action_event/backup

a3.channels.ch_f_2.dataDirs = /data2/flume/channels/user_action_event/data

a3.channels.ch_f_2.transactionCapacity = 100000

a3.channels.ch_f_2.capacity = 500000

a3.channels.ch_f_2.checkpointInterval = 60000

a3.channels.ch_f_2.keep-alive = 5

a3.channels.ch_f_2.maxFileSize = 536870912

# 配置sink

# sink_1:将数据写入hdfs

a3.sinks.sink_1.channel = ch_1

a3.sinks.sink_1.type = hdfs

a3.sinks.sink_1.hdfs.path = hdfs://master:8020/user/dw/flume/clicklog/%Y-%m-%d

a3.sinks.sink_1.hdfs.filePrefix = logs

a3.sinks.sink_1.hdfs.inUsePrefix = .

# 跟日志写入一样,hdfs的sink一开始会创建一个文件用于写入,正在写入的文件不能被读取,为确保数据可以被读取处理,sink会按照指定的条件对切分文件,当条件满足时,将正在写入的文件切换成可以读取的文件,然后创建另外一个文件用于写入。有三种切换方式:按照时间间隔;按照文件大小;按照写入的event的数量。使用中可以根据对数据处理的时效性要求进行设置,比如希望更快的读取到数据则按照时间进行切分,如果对时效性要求不高则可以按照文件大小切分

a3.sinks.sink_1.hdfs.rollInterval = 30

a3.sinks.sink_1.hdfs.rollSize = 0

a3.sinks.sink_1.hdfs.rollCount = 0

a3.sinks.sink_1.hdfs.batchSize = 1000

# 默认写入hdfs使用的是sequencefile的文件类型,使用这种格式需要将写入格式设置为Text,否则hive或者impala不能正常读取:

a3.sinks.sink_1.hdfs.writeFormat = Text

# sink_2:将数据写入kafka

a3.sinks.sink_2.type = org.apache.flume.sink.kafka.KafkaSink

a3.sinks.sink_2.channel = ch_f_1

# 指定kafka的topic:即消息的主题,以便kafka的消费端进行区分

a3.sinks.sink_2.kafka.topic = topic1

# kafka服务器地址

a3.sinks.sink_2.kafka.bootstrap.servers = kafka_server01:9092;kafka_server02:9092

# 一个批次中处理的event数量,默认为100。增加此数字可以提高吞吐量,但会降低信息处理的时效性,对于实时性要求高的场景,建议调低此配置

a3.sinks.sink_2.kafka.flumeBatchSize = 50

# kafka缓存时间:在将数据写入kafka时,等待多久以实现批量的导入。默认是直接写入,增大此配置可以增加吞吐量,但是会降低数据同步的时效性

a3.sinks.sink_2.kafka.producer.linger.ms = 1

# 消息压缩格式

a3.sinks.sink_2.kafka.producer.compression.type = snappy

【Agent-4】:位于汇总层,用于处理【Agent-2】发送过来的数据

a4.sources = src_1

a4.channels = ch_f_1

a4.sinks = sink_1

# 配置 source: 用于处理【agent-2】发过来的数据,将数据分别发送到kafka

a4.sources.src_1.channels = ch_f_1

a4.sources.src_1.type = avro

a4.sources.src_1.bind = 192.168.1.110

a4.sources.src_1.port = 44447

a4.sources.src_1.threads = 8

# 配置channel

a4.channels.ch_f_1.type = file

a4.channels.ch_f_1.checkpointDir = /data/flume/channels/realtime_event/checkpoint

a4.channels.ch_f_1.useDualCheckpoints = true

a4.channels.ch_f_1.backupCheckpointDir = /data/flume/channels/realtime_event/backup

a4.channels.ch_f_1.dataDirs = /data/flume/channels/realtime_event/data

a4.channels.ch_f_1.transactionCapacity = 100000

a4.channels.ch_f_1.capacity = 500000

a4.channels.ch_f_1.checkpointInterval = 60000

a4.channels.ch_f_1.keep-alive = 5

a4.channels.ch_f_1.maxFileSize = 5368709120

# 配置sink

a4.sinks.sink_2.type = org.apache.flume.sink.kafka.KafkaSink

a4.sinks.sink_2.channel = ch_f_1

a4.sinks.sink_2.kafka.topic = topic2

a4.sinks.sink_2.kafka.bootstrap.servers = kafka_server01:9092;kafka_server02:9092

a4.sinks.sink_2.kafka.flumeBatchSize = 20

a4.sinks.sink_2.kafka.producer.linger.ms = 1

a4.sinks.sink_2.kafka.producer.compression.type = snappy

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume读取日志数据写入kafka &nbs.. 下一篇flume环境搭建、整合kafka、读取..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目