版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/chongxin1/article/details/76104075
版本号:
RedHat6.5 JDK1.8 flume-1.6.0 kafka_2.11-0.8.2.1
flume安装
kafka安装
1、下载flume-ng-sql-source-1.4.3.jar
CSDN下载地址:http://download.csdn.net/detail/chongxin1/9892184
flume-ng-sql-source-1.4.3.jar是flume用于连接数据库的重要支撑jar包。
2、把flume-ng-sql-source-1.4.3.jar放到flume的lib目录下
3、把oracle(此处用的是oracle库)的驱动包放到flume的lib目录下
oracle的jdbc驱动包,放在oracle安装目录下,路径为:D:\app\product\11.2.0\dbhome_1\jdbc\lib
如图:
把ojdbc5.jar放到flume的lib目录下,如图:
4、新建flume-sql.conf
在conf目录新建flume-sql.conf:
- touch/usr/local/flume/apache-flume-1.6.0-bin/conf/flume-sql.conf
- sudogedit/usr/local/flume/apache-flume-1.6.0-bin/conf/flume-sql.conf
flume-sql.conf输入以下内容:
- agentOne.channels=channelOne
- agentOne.sources=sourceOne
- agentOne.sinks=sinkOne
- ###########sqlsource#################
- #Foreachoneofthesources,thetypeisdefined
- agentOne.sources.sourceOne.type=org.keedio.flume.source.SQLSource
- agentOne.sources.sourceOne.hibernate.connection.url=jdbc:oracle:thin:@192.168.168.100:1521/orcl
- #HibernateDatabaseconnectionproperties
- agentOne.sources.sourceOne.hibernate.connection.user=flume
- agentOne.sources.sourceOne.hibernate.connection.password=1234
- agentOne.sources.sourceOne.hibernate.connection.autocommit=true
- agentOne.sources.sourceOne.hibernate.dialect=org.hibernate.dialect.Oracle10gDialect
- agentOne.sources.sourceOne.hibernate.connection.driver_class=oracle.jdbc.driver.OracleDriver
- agentOne.sources.sourceOne.run.query.delay=10000
- agentOne.sources.sourceOne.status.file.path=/tmp
- agentOne.sources.sourceOne.status.file.name=sqlSource.status
- #Customquery
- agentOne.sources.sourceOne.start.from=0
- agentOne.sources.sourceOne.custom.query=selectsysdatefromdual
- agentOne.sources.sourceOne.batch.size=1000
- agentOne.sources.sourceOne.max.rows=1000
- agentOne.sources.sourceOne.hibernate.connection.provider_class=org.hibernate.connection.C3P0ConnectionProvider
- agentOne.sources.sourceOne.hibernate.c3p0.min_size=1
- agentOne.sources.sourceOne.hibernate.c3p0.max_size=10
- ##############################
- agentOne.channels.channelOne.type=memory
- agentOne.channels.channelOne.capacity=10000
- agentOne.channels.channelOne.transactionCapacity=10000
- agentOne.channels.channelOne.byteCapacityBufferPercentage=20
- agentOne.channels.channelOne.byteCapacity=800000
- agentOne.sinks.sinkOne.type=org.apache.flume.sink.kafka.KafkaSink
- agentOne.sinks.sinkOne.topic=test
- agentOne.sinks.sinkOne.brokerList=192.168.168.200:9092
- agentOne.sinks.sinkOne.requiredAcks=1
- agentOne.sinks.sinkOne.batchSize=20
- agentOne.sinks.sinkOne.channel=channelOne
- agentOne.sinks.sinkOne.channel=channelOne
- agentOne.sources.sourceOne.channels=channelOne
5、flume-ng启动flume-sql.conf和测试
- cd/usr/local/flume/apache-flume-1.6.0-bin
- bin/flume-ngagent--confconf--conf-fileconf/flume-sql.conf--nameagentOne-Dflume.root.logger=INFO,console
运行成功日志如下:
- 2017-07-0800:12:55,393(lifecycleSupervisor-1-1)[INFO-org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]Monitoredcountergroupfortype:SINK,name:sinkOne:SuccessfullyregisterednewMBean.
- 2017-07-0800:12:55,394(lifecycleSupervisor-1-1)[INFO-org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]Componenttype:SINK,name:sinkOnestarted
- 2017-07-0800:12:55,463(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO-kafka.utils.Logging$class.info(Logging.scala:68)]Fetchingmetadatafrombrokerid:0,host:localhost,port:9092withcorrelationid0for1topic(s)Set(test)
- 2017-07-0800:12:55,528(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO-kafka.utils.Logging$class.info(Logging.scala:68)]Connectedtolocalhost:9092forproducing
- 2017-07-0800:12:55,551(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO-kafka.utils.Logging$class.info(Logging.scala:68)]Disconnectingfromlocalhost:9092
- 2017-07-0800:12:55,582(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO-kafka.utils.Logging$class.info(Logging.scala:68)]Connectedtoslave2:9092forproducing
启动kafka的消费者,监听topic主题:
- kafka-console-consumer.sh--zookeeperlocalhost:2181--topictest
运行成功日志如下:
- [root@masterkafka_2.11-0.9.0.0]#kafka-console-consumer.sh--zookeeperlocalhost:2181--topictest
- "2017-07-0800:28:53.0"
- "2017-07-0800:29:03.0"
- "2017-07-0800:29:13.0"
- "2017-07-0800:29:23.0"
- "2017-07-0800:29:33.0"
- "2017-07-0800:29:43.0"
- "2017-07-0800:29:53.0"
- "2017-07-0800:30:03.0"
6、常见报错解决办法
- 2017-06-2716:26:01,293(C3P0PooledConnectionPoolManager[identityToken->1hgey889o1sjxqn51anc3fr|29938ba5]-AdminTaskTimer)[WARN-com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector.run(ThreadPoolAsynchronousRunner.java:759)]com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@2d6227f3--APPARENTDEADLOCK!!!CompleteStatus:
连接超时,造成死锁,仔细检查jdbc:oracle:thin:@192.168.168.100:1521/orcl,用户名/密码是否正确;
如果正确,还是连接不上,请检查oralce数据库是否开启了防火墙,如果是,添加入站规则或直接关闭防火墙。