设为首页 加入收藏

TOP

Flume数据推送(Push)
2019-01-22 02:12:04 】 浏览:74
Tags:Flume 数据 推送 Push

一直都是用flume的pull方式收集日志数据,为了便于以后对业务系统实时日志收集的扩展,今天研究了下push方式的日志收集。

1.首先agent端应该配置以下几种source(监听TCP/UDP端口):

syslogUdp(port)

监听Udp端口

syslogTcp(port)

监听Tcp端口

syslogTcp1(port)

只监听Tcp端口的一个链接

2.然后在业务端直接向所配置的端口发送数据即可

大致如下:

		Socket client = new Socket("10.64.49.198", 23456);
		OutputStream out = client.getOutputStream();
		String event = "<4>hello\n";
		out.write(event.getBytes());
		out.flush();
		out.close();
		client.close();

所发送的event格式必需为如下格式

“<n>event_content\n”
在collector端console输出内容如下

"{ syslogfacility : n/8 } { syslogseverity : n%8 } event_content"

其中

syslogfacility = n/8

syslogseverity = n%8

假如event的内容为:"<4>hello\n"

输出为:"{ syslogfacility :0 } { syslogseverity :4 } hello"

源码中具体解析类为com.cloudera.flume.handlers.syslog.SyslogWireExtractor

/**
   * This is basically a state machine implementation of the extract function.
   * It uses a DataInputStream instead of a string to avoid the cost of string
   * and character encoding
   */
  public Event extract(DataInputStream in) throws EventExtractException {
    Preconditions.checkNotNull(in);
    Mode m = Mode.START;
    StringBuilder prio = new StringBuilder();
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    byte b = 0;
    long cnt = 0;
    try {
      while (true) {
        b = in.readByte();
        cnt++;
        switch (m) {
        case START:
          if (b == '<') {
            m = Mode.PRIO;
          } else {
            m = Mode.ERR;
          }
          break;
        case PRIO:
          if (b == '>') {
            m = Mode.DATA;
          } else {
            char ch = (char) b;
            if (Character.isDigit(ch)) {
              prio.append(ch); // stay in PRIO mode
            } else {
              m = Mode.ERR;
            }
          }
          break;
        case DATA:
          if (b == '\n') {
            Event e = buildEvent(prio, baos);
            return e;
          }

          baos.write(b);
          break;
        case ERR:
          // read until we get to a \n
          if (b == '\n') {
            throw new EventExtractException(
                "Failed to extract syslog wire entry");
          }
          // stay in Mode.ERR;
          break;
        }
      }
    } catch (EOFException e) {
      switch (m) {
      case ERR:
        // end of stream but was in error state Throw extraction exception
        throw new EventExtractException("Failed to extract syslog wire entry");
      case DATA:
        // end of stream but had data, return it.
        return buildEvent(prio, baos);
      default:
        // if not in error state just return done;
        return null;
      }
    } catch (IOException e) {
      throw new EventExtractException("Failed to extract syslog wire entry: "
              + e.getMessage());
    }
  }

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume   三大核心组件 下一篇Windows64环境下   使用Flum..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目