设为首页 加入收藏

TOP

log4j+flume+es的配置
2019-03-29 02:09:19 】 浏览:113
Tags:log4j flume 配置
版权声明:everything https://blog.csdn.net/wanbf123/article/details/81561069

1、配置pom

<xml version="1.0" encoding="UTF-8">
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.study.test</groupId>
    <artifactId>log4j2flume</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flume.flume-ng-clients</groupId>
            <artifactId>flume-ng-log4jappender</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.6.1</version>
        </dependency>
    </dependencies>
</project>

2、配置log4j.properties

log4j.rootLogger=INFO,flume
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern="%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 10.45.17.66
log4j.appender.flume.Port = 4444
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n

3、配置 flume的source、channel、sink

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4、启动flume

bin/flume-ng agent -c conf -f conf/testlog2flume.conf --name a1 -Dflume.root.logger=INFO,console

5、启动报错

错误一:

[ERROR - org.apache.flume.sink.elasticsearch.ElasticSearchSink.configure
(ElasticSearchSink.java:302)] Could not instantiate event serializer.
java.lang.ClassNotFoundException: org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:191)
        at org.apache.flume.sink.elasticsearch.ElasticSearchSink.configure
(ElasticSearchSink.java:286)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks
(AbstractConfigurationProvider.java:413)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration
(AbstractConfigurationProvider.java:98)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run
(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301
(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run
(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

原因:
缺少Elasticsearch中的依赖包

解决方案:
1. 将Elasticsearch中lib下的jar包导入到Flume的lib下
2. 在环境变量中引入Elasticsearch的依赖:
FLUME_CLASSPATH="/home/elk/elasticsearch-2.1.1/lib/*"

错误二

[ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run
(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { 
policy:org.apache.flume.sink.DefaultSinkProcessor@1c9f6ece 
counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.
<init>(Ljava/lang/String;I)V
        at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.
configureHostnames(ElasticSearchTransportClient.java:143)
        at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient
.<init>(ElasticSearchTransportClient.java:77)
        at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient
(ElasticSearchClientFactory.java:48)
        at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
        at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
        at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run
(LifecycleSupervisor.java:251)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301
(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run
(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

原因:

Elasticsearch的版本过高,导致Flume的jar包与Elasticsearch不兼容

解决方案:重置Elasticsearch版本至1.7.1

6、java测试代码

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.Date;

public class WriteLog {
    protected static final Log logger = LogFactory.getLog(WriteLog.class);


    public static void main(String[] args) throws InterruptedException {

        while (true) {
            // 每隔两秒log输出一下当前系统时间戳
            logger.info(new Date().getTime());
            //System.out.println(new Date().getTime());
            Thread.sleep(2000);
            try {
                throw new Exception("exception msg");
            }
            catch (Exception e) {
                logger.error("error:" + e.getMessage());
            }
        }
    }
}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Flume exec source日志 sink到H.. 下一篇flume文件采集

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目