设为首页 加入收藏

TOP

Flume 1.5.2 日志中出现agent-shutdown-hook ,然后结束进程
2018-12-02 18:38:28 】 浏览:284
Tags:Flume 1.5.2日志 出现 agent-shutdown-hook 然后 结束 进程

最近在使用Flume收集日志,发现每隔几个小时就flume进程就会挂掉,但是日志中又没有Error 和Exception,进程结束前的一些日志如下:

09 Apr 2015 14:47:21,160 INFO  [agent-shutdown-hook] (org.apache.flume.lifecycle.LifecycleSupervisor.stop:79)  - Stopping lifecycle supervisor 11
09 Apr 2015 14:47:21,162 INFO  [agent-shutdown-hook] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider.stop:83)  - Configuration provider stopping
09 Apr 2015 14:47:21,177 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.channel.file.Log.roll:932)  - Roll start /maichuang/flume/channel/data
09 Apr 2015 14:47:21,177 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.channel.file.LogFile$Writer.<init>:214)  - Opened /maichuang/flume/channel/data/log-15
09 Apr 2015 14:47:21,178 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.channel.file.Log.roll:948)  - Roll end
09 Apr 2015 14:47:21,210 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.channel.file.Log.roll:932)  - Roll start /maichuang/flume/channel/data
09 Apr 2015 14:47:21,211 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.channel.file.LogFile$Writer.<init>:214)  - Opened /maichuang/flume/channel/data/log-16
09 Apr 2015 14:47:21,211 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.channel.file.Log.roll:948)  - Roll end
09 Apr 2015 14:47:21,211 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.
org.apache.flume.ChannelException: Commit failed due to IO error [channel=channel1]
        at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:605)
        at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:466)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedByInterruptException
        at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:380)
        at org.apache.flume.channel.file.LogFileV3.writeDelimitedTo(LogFileV3.java:148)
        at org.apache.flume.channel.file.LogFileV3$Writer.<init>(LogFileV3.java:209)
        at org.apache.flume.channel.file.LogFileFactory.getWriter(LogFileFactory.java:77)
        at org.apache.flume.channel.file.Log.roll(Log.java:935)
        at org.apache.flume.channel.file.Log.roll(Log.java:904)
        at org.apache.flume.channel.file.Log.rollback(Log.java:719)
        at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:603)
        ... 5 more
09 Apr 2015 14:47:21,212 INFO  [agent-shutdown-hook] (org.apache.flume.sink.hdfs.HDFSEventSink.stop:499)  - Closing hdfs://master:9000/flume/2015_04_09-1446.log
09 Apr 2015 14:47:21,213 INFO  [agent-shutdown-hook] (org.apache.flume.sink.hdfs.BucketWriter.close:409)  - Closing hdfs://master:9000/flume/_2015_04_09-1446.log.1428562035496.tmp
09 Apr 2015 14:47:21,214 INFO  [hdfs-master-call-runner-9] (org.apache.flume.sink.hdfs.BucketWriter$3.call:339)  - Close tries incremented
虽然我不太懂java,但是我还是硬着头皮去源码里面查看
<pre name="code" class="java">org.apache.flume.lifecycle.LifecycleSupervisor

对应的源码,感觉应该不是这里的问题。后来,感觉应该找 agent-shutdown-hook 相关的信息,果然找到了:

http://www.codatlas.com/github.com/apache/flume/HEAD/flume-ng-node/src/main/java/org/apache/flume/node/Application.javaline=345

public static void main(String[] args) {

    try {

      boolean isZkConfigured = false;

      Options options = new Options();

      Option option = new Option("n", "name", true, "the name of this agent");
      option.setRequired(true);
      options.addOption(option);

      option = new Option("f", "conf-file", true,
          "specify a config file (required if -z missing)");
      option.setRequired(false);
      options.addOption(option);

      option = new Option(null, "no-reload-conf", false,
          "do not reload config file if changed");
      options.addOption(option);

      // Options for Zookeeper
      option = new Option("z", "zkConnString", true,
          "specify the ZooKeeper connection to use (required if -f missing)");
      option.setRequired(false);
      options.addOption(option);

      option = new Option("p", "zkBasePath", true,
          "specify the base path in ZooKeeper for agent configs");
      option.setRequired(false);
      options.addOption(option);

      option = new Option("h", "help", false, "display help text");
      options.addOption(option);

      CommandLineParser parser = new GnuParser();
      CommandLine commandLine = parser.parse(options, args);

      if (commandLine.hasOption('h')) {
        new HelpFormatter().printHelp("flume-ng agent", options, true);
        return;
      }

      String agentName = commandLine.getOptionValue('n');
      boolean reload = !commandLine.hasOption("no-reload-conf");

      if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
        isZkConfigured = true;
      }
      Application application = null;
      if (isZkConfigured) {
        // get options
        String zkConnectionStr = commandLine.getOptionValue('z');
        String baseZkPath = commandLine.getOptionValue('p');

        if (reload) {
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          List<LifecycleAware> components = Lists.newArrayList();
          PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider =
            new PollingZooKeeperConfigurationProvider(
              agentName, zkConnectionStr, baseZkPath, eventBus);
          components.add(zookeeperConfigurationProvider);
          application = new Application(components);
          eventBus.register(application);
        } else {
          StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider =
            new StaticZooKeeperConfigurationProvider(
              agentName, zkConnectionStr, baseZkPath);
          application = new Application();
          application.handleConfigurationEvent(zookeeperConfigurationProvider
            .getConfiguration());
        }
      } else {
        File configurationFile = new File(commandLine.getOptionValue('f'));

        /*
         * The following is to ensure that by default the agent will fail on
         * startup if the file does not exist.
         */
        if (!configurationFile.exists()) {
          // If command line invocation, then need to fail fast
          if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
            null) {
            String path = configurationFile.getPath();
            try {
              path = configurationFile.getCanonicalPath();
            } catch (IOException ex) {
              logger.error("Failed to read canonical path for file: " + path,
                ex);
            }
            throw new ParseException(
              "The specified configuration file does not exist: " + path);
          }
        }
        List<LifecycleAware> components = Lists.newArrayList();

        if (reload) {
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(
              agentName, configurationFile, eventBus, 30);
          components.add(configurationProvider);
          application = new Application(components);
          eventBus.register(application);
        } else {
          PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(
              agentName, configurationFile);
          application = new Application();
          application.handleConfigurationEvent(configurationProvider
            .getConfiguration());
        }
      }
      application.start();

      final Application appReference = application;
      <strong><span style="color:#ff0000;">Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
        @Override
        public void run() {
          appReference.stop();
        }
      });</span></strong>

    } catch (Exception e) {
      logger.error("A fatal error occurred while running. Exception follows.",
          e);
    }
  }

上面红色代码部分应该就是结束进程的代码所在。

于是,我又查找Runtime.getRuntime().addShutdownHook,在

http://www.codatlas.com/github.com/openjdk-mirror/jdk7u-jdk/master/src/share/classes/java/lang/Runtime.javakeyword=getRuntime&line=209

中找到了:

  public void addShutdownHook(Thread hook) {
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            sm.checkPermission(new RuntimePermission("shutdownHooks"));
        }
        ApplicationShutdownHooks.add(hook);
    }
http://www.codatlas.com/github.com/openjdk-mirror/jdk7u-jdk/HEAD/src/share/classes/java/lang/ApplicationShutdownHooks.javaline=83中也有相关的代码:

static synchronized void add(Thread hook) {
        if(hooks == null)
            throw new IllegalStateException("Shutdown in progress");

        if (hook.isAlive())
            throw new IllegalArgumentException("Hook already running");

        if (hooks.containsKey(hook))
            throw new IllegalArgumentException("Hook previously registered");

        hooks.put(hook, hook);
    }
另外在Application.java源码中,有这么一段:

public synchronized void stop() {
    supervisor.stop();
    if(monitorServer != null) {
      monitorServer.stop();
    }
  }

然后我又去看supervisor.stop()对应的源码,可从Application.java 中

public Application(List<LifecycleAware> components) {
    this.components = components;
    supervisor = new LifecycleSupervisor();
  }

可知:supervisor = new LifecycleSupervisor()

supervisor 是 LifecycleSupervisor的一个实例,故又转而看LifecycleSupervisor所在的源码:

http://www.codatlas.com/github.com/apache/flume/HEAD/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.javaline=51

Application.java代码中的

 application.start();
<span style="color:#ff0000;">//启动application之后再执行如下代码??这样不就是一直会执行stop()了么?这中间的逻辑我还没搞明白,期待高手帮忙解答。</span>
      final Application appReference = application;
      Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
        @Override
        public void run() {
          appReference.stop();
        }
      });
如上面的红色字体,目前还没搞懂,望高手留言指点迷津,或发我邮箱daiyongtao0211@qq.com ,谢谢!
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇ICC副本>>>>(logback.. 下一篇flume自定义file sink,实现按时间..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目