设为首页 加入收藏

TOP

Kafka实战——Java程序日志通过log4j到flume再到Kafka
2018-12-05 02:26:31 】 浏览:95
Tags:Kafka 实战 Java 程序 日志 通过 log4j flume 再到

目的:实现一个简单的测试,flume收集程序的日志信息,再将其输出到Kafka中。

Java程序:win10

flume服务、Kafka服务:centos虚拟机

1. 写Java程序

(1)导入依赖包:flume-ng-log4jappender、slf4j-simple

    <dependency>
        <groupId>org.apache.flume.flume-ng-clients</groupId>
        <artifactId>flume-ng-log4jappender</artifactId>
        <version>1.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.6.6</version>
    </dependency>

(2)新建log4j.properties文件,并添加如下配置:

日志输出到两个地方:控制台(方便查看)和 flume服务器(对输入的消息进行缓冲存储并输出到其它地方)

log4j.rootLogger=INFO,console,flume  #日志级别,appenderName, appenderName

# console appender config
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n

# flume appender config
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname=192.168.23.128   # flume所在服务器地址,可以是本地,也可是远程
log4j.appender.flume.Port=44444
log4j.appender.flume.UnsafeMode=true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n

(3)编写简单应用程序

import org.apache.log4j.Logger;

public class ProducerLog {
    protected static final Logger LOG=Logger.getLogger(ProducerLog.class);

    public static void main(String[] args) {
        LOG.debug("这是一条debug级别的日志!");
        LOG.info("这是一条info级别的日志!");
        LOG.error("这是一条error级别的日志!");
        LOG.fatal("这是一条fatal级别的日志!");

//        LOG.debug("1");
//        LOG.info ("2");
//        LOG.error("3");
//        LOG.fatal("4");
    }
}

在没启动flume服务时运行程序,会出现如下错误(Appender已关闭或没有正确启动):

所以接下来要先配置flume服务...

2. 配置flume服务

在flume的conf下创建log4j-flume-kafka.properties文件,source端监听log4j发送消息的端口,sink端绑定到Kafka的log4j-flume-kafka的topic中。


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.23.128
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=192.168.23.128:9092
a1.sinks.k1.kafka.topic=log4j-flume-kafka
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.kafka.producer.acks=1
a1.sinks.k1.custom.encoding=UTF-8


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3. 创建kafka topic

kafkaXXX/bin/kafka-topics.sh --zookeeper 192.168.23.128:2181 --create --topic log4j-flume-kafka --partitions 1 --replication-factor 1

4.启动kafka消费者,消费log4j-flume-kafka数据

 kafkaXXX/bin/kafka-console-consumer.sh --bootstrap-server 192.168.23.128:9092 --topic log4j-flume-kafka --from-beginning

5.启动flume服务

flumeXXX/bin/flume-ng agent -n a1 -c conf -f conf/log4j-flume-kafka.properties

6.运行Java程序,观察Java console及Kafka消费端

到此,一个简单的log4j-flume-kafka应用就搭建好了。

后记:

实际应用中,flume常和应用程序部署在同一机器上,应用程序将日志写入文件中,flume再以监听命令的方式(tail命令打开文件)对该文件进行监听,再把其传入到Kafka集群中。即flume的source端应配置为:

a1.sources.r1.type = exec #指定源类型为Linux 命令
a1.sources.r1.command = tail -f /tmp/test.log #以tail 命令打开文件输出流
a1.sources.r1.fileHeader = false #指定事件不包括头信息

这样,Kafka和flume集群故障时,都不会影响到应用程序的正常运行。flume成了Kafka的一个producer,因为flume是一个轻服务应用,可在每台应用服务器上都部署一个。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇【kafka】Flume+Kafka+logstash+E.. 下一篇Windows64环境下   使用Flum..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目