设为首页 加入收藏

TOP

Flume 自定义source   -- SQLSource (转为 json 格式)
2019-05-14 14:09:17 】 浏览:15
Tags:Flume 定义 source   SQLSource 转为 json 格式
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/github_37643896/article/details/87258584

个人使用flume 相对较多 ,对他的采集任务比较喜欢 ,自己做了一些数据库方向的的拓展。

虽然 github 上 有很多 自定义的 flume sql-source 比如 大名鼎鼎 的https://github.com/keedio/flume-ng-sql-source

但是 我个人在使用的过程中遇到了一些问题 也一直没有得到解决 ,https://github.com/keedio/flume-ng-sql-source/issues/59

并且 格式 是csv 格式 。

所以我自己写了一套 sqlsource 基于原生的jdbc 连接 ,没有使用 其他的框架 ,读取数据库 转为 json 格式 。也便于解析个人 项目 地址,欢迎提出不足 ,有问题 直接 提issues。我会尽快帮大家解决 。

一、
在使用flume采集日志时,可以通过flume进行监控某一个文件把生产的数据传输给指定的sink,但是如果某段时间flume所在机器宕机了,那么当重新启动后,在去监控时,会导致有数据丢失,不是接着上一次的数据继续进行读取,因此针对这种情况时可能需要我们自定义一个source,记录偏移量,每次都是接着上次继续读,记录 数据已经发送的位置 。
二、
下面就是具体实现的代码
再写代码时可以参照官方给的source的源码进行编写.
flume的生命周期: 先执行构造器,再执行 config方法 --> start方法 --> processor.process–> stop

读取配置文件 -> 初始化 数据库连接及 相关参数 -> 解析 sql -> 获取 resultsSet -> 转为 Json -> 发送个channel -> stop -> 记录当前位置

具体 详见https://github.com/HbnKing/Flume-ng-Database/blob/master/RDB/src/main/java/com/hbn/rdb/source/SQLSource.java

三 、

配置文件

这里采用 loggersink 主要看一下Source 的 一些相关 配置

支持 自定义 sql

f2.sources = r1
f2.channels = c1
f2.sinks = k1

# 这里用 自己定义的 SQLSource
f2.sources.r1.type = com.hbn.rdb.source.SQLSource
f2.sources.r1.connectionurl = jdbc:oracle:thin:@//ip:1521/orcl
f2.sources.r1.user = yyj
f2.sources.r1.password = yyj
f2.sources.r1.driverclass= oracle.jdbc.driver.OracleDriver
f2.sources.r1.filepath = /var/log/sqllog
f2.sources.r1.filename = sqlSource.status
#f2.sources.r1.customquery = select a.id,a.COUPON_id,b.id from USER_COUPON_CODE_1 a ,COUPON_CODE b  where a.COUPON_id = b.id
#f2.sources.r1.customquery = select a.id,a.COUPON_id,b.id from USER_COUPON_CODE_1 a ,COUPON_CODE b  where a.COUPON_id = b.id  and  a.id > $@$
f2.sources.r1.customquery = select * from USER_COUPON_CODE_1
f2.sources.r1.begin = 0
f2.sources.r1.autoincrementfield = a.id
f2.sources.r1.batchsize = 1000
#具体定义channel
f2.channels.c1.type = memory
f2.channels.c1.capacity = 1000
f2.channels.c1.transactionCapacity = 100
#具体定义sink
f2.sinks.k1.type = logger
#组装source、channel、sink
f2.sources.r1.channels = c1
f2.sinks.k1.channel = c1

四 、

启动命令

bin/flume-ng agent --conf conf/ --conf-file conf/flume-conf1.properties --name f2-Dflume.root.logger=INFO,console

五 、

查看结果


{  "ID" : NumberLong("10000900000"), "COUPON_ID" : 900000, "ID_3" : NumberLong("10000900000") }
{ "ID" : NumberLong("10000700000"), "COUPON_ID" : 700000, "ID_3" : NumberLong("10000700000") }
{  "ID" : NumberLong("10000300000"), "COUPON_ID" : 300000, "ID_3" : NumberLong("10000300000") }
{ "ID" : NumberLong("10000800000"), "COUPON_ID" : 800000, "ID_3" : NumberLong("10000800000") }
{ "ID" : NumberLong("10000500001"), "COUPON_ID" : 500001, "ID_3" : NumberLong("10000500001") }
{  "ID" : NumberLong("10000500002"), "COUPON_ID" : 500002, "ID_3" : NumberLong("10000500002") }
{  "ID" : NumberLong("10000500003"), "COUPON_ID" : 500003, "ID_3" : NumberLong("10000500003") }

六 、

其他

如有问题 ,请留言 ,欢迎提出不足 和 建议 。

感谢贡献 。


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume读取日志数据写入kafka &nbs.. 下一篇flume   三大核心组件

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }