设为首页 加入收藏

TOP

SpringBoot之Flume整合Logback
2019-04-20 02:04:03 】 浏览:95
Tags:SpringBoot Flume 整合 Logback

我还是喜欢比较浓一点的茶,比如,金骏眉。


前言
Kafka的确可以处理千万条甚至更多的数据,但是这些数据从哪里来呢?如果是处于开发中的项目,我们的确可以在代码中主动生产消息并推送到kafka;如果是那些已经写好了的项目,该怎么将消息推送到kafka呢,此时我们就可以借助Flume等框架来完成数据的采集,进而推送给kafka
Flume一定要结合日志框架(如:Log4j、Log4j2、Logback等)和消息组件(如:Kafka、RabbitMQ、ActiveMQ、RocketMQ)一起使用。
提示:在实际使用时,往往还会在上述基础上整合hadoop(或storm、spark等大数据框架)往往还会使用的数据分析框架。

本人测试时软硬件环境说明
Windows10、IntelliJ IDEA、SpringBoot 2.1.3.RELEASE、apache-flume-1.9.0-bin.tar.gz。

准备工作:

注:安装Flume较简单,这里不再给出。

Flume整合Logback:

第一步:新建一个SpringBoot项目并引入logback-flume依赖。

给出完整pom.xml:

<xml version="1.0" encoding="UTF-8">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.aspire</groupId>
    <artifactId>logback-flume</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>logback-flume</name>
    <description>SpringBoot整合Logback + Flume</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- 引入logback-flume的依赖 -->
        <!-- https://www.mvnjar.com/com.teambytes.logback/logback-flume-appender_2.10/0.0.9/detail.html -->
        <dependency>
            <groupId>com.teambytes.logback</groupId>
            <artifactId>logback-flume-appender_2.10</artifactId>
            <version>0.0.9</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

注:SpringBoot天生自带logback,所以我们这里只需要引入一个logback-flume依赖即可。

第二步:在资源文件夹resources下创建logback日志配置文
件logback-spring.xml,并进行相应配置。

<xml version="1.0" encoding="UTF-8">

<configuration>

    <appender name="consoleAppender"
              class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyy MMM dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}:%L- %msg%n
            </pattern>
        </encoder>
    </appender>

    <!--
        name:自取即可,
        class:加载指定类(ch.qos.logback.core.rolling.RollingFileAppender类会将日志输出到>>>指定的文件中),
        patter:指定输出的日志格式 file:指定存放日志的文件(如果无,则自动创建) rollingPolicy:滚动策略>>>每天结束时,都会将该天的日志存为指定的格式的文件
        FileNamePattern:文件的全路径名模板 (注:如果最后结尾是gz或者zip等的话,那么会自动打成相应压缩包)
    -->
    <appender name="fileAppender"
              class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 把日志文件输出到:项目启动的目录下的log文件夹(无则自动创建)下 -->
        <file>log/logFile.log</file>
        <!-- 把日志文件输出到:name为logFilePositionDir的property标签指定的位置下 -->
        <!-- <file>${logFilePositionDir}/logFile.log</file> -->
        <!-- 把日志文件输出到:当前磁盘下的log文件夹(无则自动创建)下 -->
        <!-- <file>/log/logFile.log</file> -->
        <rollingPolicy
                class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- TimeBasedRollingPolicy策略会将过时的日志,另存到指定的文件中(无该文件则创建) -->
            <!-- 把因为 过时 或 过大  而拆分后的文件也保存到目启动的目录下的log文件夹下  -->
            <fileNamePattern>log/logFile.%d{yyyy-MM-dd}.%i.log
            </fileNamePattern>
            <!-- 设置过时时间(单位:<fileNamePattern>标签中%d里最小的时间单位) -->
            <!-- 系统会删除(分离出去了的)过时了的日志文件 -->
            <!-- 本人这里:保存以最后一次日志为准,往前7天以内的日志文件 -->
            <MaxHistory>
                7
            </MaxHistory>
            <!-- 滚动策略可以嵌套;
                     这里嵌套了一个SizeAndTimeBasedFNATP策略,
                        主要目的是: 在每天都会拆分日志的前提下,
                        当该天的日志大于规定大小时,
                        也进行拆分并以【%i】进行区分,i从0开始
            -->
            <timeBasedFileNamingAndTriggeringPolicy
                    class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>5MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
        </rollingPolicy>
        <encoder>
            <pattern>%d{yyy MMM dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}:%L- %msg%n
            </pattern>
        </encoder>
    </appender>

    <appender name="flumeTest" class="com.teambytes.logback.flume.FlumeLogstashV1Appender">
        <flumeAgents>
            10.8.109.36:44444
        </flumeAgents>
        <flumeProperties>
            connect-timeout=4000;
            request-timeout=8000
        </flumeProperties>
        <batchSize>100</batchSize>
        <reportingWindow>1000</reportingWindow>
        <additionalAvroHeaders>
            myHeader = myValue
        </additionalAvroHeaders>
        <application>JustryDeng's Application</application>
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - \(%file:%line\) - %message%n%ex</pattern>
        </layout>
    </appender>

    <logger name="com" level="info">
        <appender-ref ref="flumeTest"/>
    </logger>

    <root level="info">
        <appender-ref ref="consoleAppender"/>
    </root>
</configuration>

提示:更多关于logback配置文件的信息可参考https://blog.csdn.net/justry_deng/article/details/81230155

第三步:修改Flume所在服务器上的Flume的配置文件,将数据传输格式改为avro。

注:avro是hadoop的一个子项目,提供的功能与thrift、Protocol Buffer类似,都支持二进制高效序列化,也
自带RPC机制,但是avro使用起来更简单,无需象thrift那样生成目标语言源代码,目前支持的语言有
java、c#、php、c++等。
追注:Avro就是用来在传输数据之前,将对象转换成二进制流,然后此二进制流达到目标地址后,Avro再
将二进制流转换成对象。.

注:xxxA.sources.xxxB.bind的值如果写的是localhost,那么只能本机才能访问,所以这里要写成服务器的ip
地址,如果在/etc/hosts文件中配置了主机名,那么也可以写主机名(如上图所示)。

第四步:开放Flume对应的端口。

指令为:

# CentOS7开放端口
firewall-cmd --zone=public --add-port=44444/tcp --permanent
# CentOS7重新加载防火墙
firewall-cmd --reload
#(CentOS)查看所有开放的端口
firewall-cmd --zone=public --list-ports	

执行效果如图:

第五步:启动Flume服务

前台启动方式:

# 前台启动
/var/local/flume/bin/flume-ng agent --conf /var/local/flume/conf/ --conf-file /var/local/flume/conf/flume.conf --name a1 -Dflume.root.logger=INFO,console

后台启动方式:

# 后台启动
nohup /var/local/flume/bin/flume-ng agent --conf /var/local/flume/conf/ --conf-file /var/local/flume/conf/flume.conf --name a1 -Dflume.root.logger=INFO,console >/flume.log &

(前台启动)执行效果如图:

……

注:为了方便观察,我们不妨使用前台启动的方式。

第六步:编写一个程序,并且该程序记录日志输出,如:

注:如果要往flume输送日志信息,那么在记录日志时最好不要使用{}占位符,要不然flume收到的信息的对应位置
可能就是占位符{}本身。

第七步:运行该程序,控制台输出:

在flume服务器上,也可看见输出:

由此可见,Flume整合Logback成功!

声明:本文为学习笔记,学习自51CTO,《Kafka消息中间件》,讲师李兴华

^_^ 如有不当之处,欢迎指正

^_^ 学习视频:
《Kafka消息中间件》,讲师李兴华

^_^ 测试代码托管链接:
https://github.com/JustryDeng/CommonRepository

^_^ 本文已经被收录进《程序员成长笔记(四)》,笔者JustryDeng

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume知识进阶 三组件支持的各种.. 下一篇log4j集成flume-ng

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目