[ERROR - org.apache.flume.sink.elasticsearch.ElasticSearchSink.configure
(ElasticSearchSink.java:302)] Could not instantiate event serializer.
java.lang.ClassNotFoundException: org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at org.apache.flume.sink.elasticsearch.ElasticSearchSink.configure
(ElasticSearchSink.java:286)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks
(AbstractConfigurationProvider.java:413)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration
(AbstractConfigurationProvider.java:98)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run
(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301
(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run
(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
原因:
缺少Elasticsearch中的依赖包
解决方案:
1. 将Elasticsearch中lib下的jar包导入到Flume的lib下
2. 在环境变量中引入Elasticsearch的依赖:
FLUME_CLASSPATH="/home/elk/elasticsearch-2.1.1/lib/*"
错误二
[ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run
(LifecycleSupervisor.java:253)] Unable to start SinkRunner: {
policy:org.apache.flume.sink.DefaultSinkProcessor@1c9f6ece
counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.
<init>(Ljava/lang/String;I)V
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.
configureHostnames(ElasticSearchTransportClient.java:143)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient
.<init>(ElasticSearchTransportClient.java:77)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient
(ElasticSearchClientFactory.java:48)
at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run
(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301
(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run
(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
原因:
Elasticsearch的版本过高,导致Flume的jar包与Elasticsearch不兼容
解决方案:重置Elasticsearch版本至1.7.1
6、java测试代码
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Date;
public class WriteLog {
protected static final Log logger = LogFactory.getLog(WriteLog.class);
public static void main(String[] args) throws InterruptedException {
while (true) {
// 每隔两秒log输出一下当前系统时间戳
logger.info(new Date().getTime());
//System.out.println(new Date().getTime());
Thread.sleep(2000);
try {
throw new Exception("exception msg");
}
catch (Exception e) {
logger.error("error:" + e.getMessage());
}
}
}
}